You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2023/05/13 04:54:27 UTC

[plc4x] branch develop updated: feat(plc4py): Finish WriteBufferByteBased

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1c3fa84cf3 feat(plc4py): Finish WriteBufferByteBased
1c3fa84cf3 is described below

commit 1c3fa84cf356b9a2364220313e1d10798cb1d8b3
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Sat May 13 06:21:42 2023 +0200

    feat(plc4py): Finish WriteBufferByteBased
---
 .../plc4py/plc4py/spi/generation/WriteBuffer.py    | 69 ++++++++++++++++------
 .../tests/unit/plc4py/spi/test_write_buffer.py     | 16 ++++-
 2 files changed, 67 insertions(+), 18 deletions(-)

diff --git a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
index 3fca938f92..187d1aa0af 100644
--- a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
+++ b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
@@ -15,9 +15,9 @@
 #  specific language governing permissions and limitations
 #  under the License.
 from ctypes import c_bool, c_byte, c_ubyte, c_uint16, c_uint32, c_uint64, c_int16, c_int32, c_int64, c_float, c_double, \
-    c_int8
+    c_int8, c_uint8
 from dataclasses import dataclass
-from typing import List
+from typing import List, Union
 
 from bitarray import bitarray
 from bitarray.util import zeros
@@ -105,6 +105,9 @@ class WriteBufferByteBased(WriteBuffer):
     position: int = 0
     bb: bitarray
 
+    NUMERIC_UNION = Union[c_byte, c_uint8, c_uint16, c_uint32, c_uint64, c_int8, c_int16, c_int32, c_int64, c_float, c_double]
+
+
     def __init__(self, size: int, byte_order: ByteOrder):
         self.bb = zeros(size * 8, endian=ByteOrder.get_short_name(byte_order))
         self.byte_order = byte_order
@@ -136,7 +139,7 @@ class WriteBufferByteBased(WriteBuffer):
         elif bit_length > 8:
             raise SerializationException("unsigned byte can only contain max 8 bits")
         else:
-            self._handle_integer_encoding(c_byte(value.value), bit_length, **kwargs)
+            self._handle_numeric_encoding(c_byte(value.value), bit_length, **kwargs)
 
     def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
         if bit_length <= 0:
@@ -144,7 +147,7 @@ class WriteBufferByteBased(WriteBuffer):
         elif bit_length > 16:
             raise SerializationException("unsigned short can only contain max 16 bits")
         else:
-            self._handle_integer_encoding(c_uint16(value.value), bit_length, **kwargs)
+            self._handle_numeric_encoding(c_uint16(value.value), bit_length, **kwargs)
 
     def write_unsigned_int(self, value: c_uint32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
         if bit_length <= 0:
@@ -152,7 +155,7 @@ class WriteBufferByteBased(WriteBuffer):
         elif bit_length > 32:
             raise SerializationException("unsigned int can only contain max 32 bits")
         else:
-            self._handle_integer_encoding(c_uint32(value.value), bit_length, **kwargs)
+            self._handle_numeric_encoding(c_uint32(value.value), bit_length, **kwargs)
 
     def write_unsigned_long(self, value: c_uint64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
         if bit_length <= 0:
@@ -160,9 +163,51 @@ class WriteBufferByteBased(WriteBuffer):
         elif bit_length > 64:
             raise SerializationException("unsigned long can only contain max 16 bits")
         else:
-            self._handle_integer_encoding(c_uint64(value.value), bit_length, **kwargs)
+            self._handle_numeric_encoding(c_uint64(value.value), bit_length, **kwargs)
+
+    def write_signed_byte(self, value: c_int8, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Signed byte must contain at least 1 bit")
+        elif bit_length > 8:
+            raise SerializationException("Signed byte can only contain max 8 bits")
+        self._handle_numeric_encoding(c_int8(value.value), bit_length, **kwargs)
+
+    def write_short(self, value: c_int16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Signed short must contain at least 1 bit")
+        elif bit_length > 16:
+            raise SerializationException("Signed short can only contain max 16 bits")
+        self._handle_numeric_encoding(c_int16(value.value), bit_length, **kwargs)
+
+    def write_int(self, value: c_int32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Signed int must contain at least 1 bit")
+        elif bit_length > 32:
+            raise SerializationException("Signed int can only contain max 32 bits")
+        self._handle_numeric_encoding(c_int32(value.value), bit_length, **kwargs)
+
+    def write_long(self, value: c_int64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Signed long must contain at least 1 bit")
+        elif bit_length > 64:
+            raise SerializationException("Signed long can only contain max 64 bits")
+        self._handle_numeric_encoding(c_int64(value.value), bit_length, **kwargs)
 
-    def _handle_integer_encoding(self, value, bit_length: int, **kwargs):
+    def write_float(self, value: c_float, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Float must contain at least 1 bit")
+        elif bit_length > 32:
+            raise SerializationException("Float can only contain max 32 bits")
+        self._handle_numeric_encoding(c_float(value.value), bit_length, **kwargs)
+
+    def write_double(self, value: c_double, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("Double must contain at least 1 bit")
+        elif bit_length > 64:
+            raise SerializationException("Double can only contain max 64 bits")
+        self._handle_numeric_encoding(c_double(value.value), bit_length, **kwargs)
+
+    def _handle_numeric_encoding(self, value: NUMERIC_UNION, bit_length: int, **kwargs):
         value_encoding: str = kwargs.get("encoding", "default")
         if value_encoding == "ASCII":
             if bit_length % 8 != 0:
@@ -183,13 +228,3 @@ class WriteBufferByteBased(WriteBuffer):
             src.frombytes(value)
             self.bb[self.position:bit_length] = src[:bit_length]
             self.position += bit_length
-
-    def write_signed_byte(self, value: c_int8, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
-        if bit_length <= 0:
-            raise SerializationException("Signed byte must contain at least 1 bit")
-        elif bit_length > 8:
-            raise SerializationException("Signed byte can only contain max 8 bits")
-        src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
-        src.frombytes(value)
-        self.bb[self.position:bit_length] = src[:bit_length]
-        self.position += bit_length
diff --git a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
index ab51bd85d2..af25806dfb 100644
--- a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
+++ b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from ctypes import c_bool, c_byte, c_uint16, c_uint64, c_int8
+from ctypes import c_bool, c_byte, c_uint16, c_uint64, c_int8, c_float
 
 import pytest
 from bitarray import bitarray
@@ -236,3 +236,17 @@ def test_write_buffer_set_signed_byte_three(mocker) -> None:
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("11000000",
                                endian="little"))
+
+def test_write_buffer_set_float_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_float(c_float(-1), 32)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00000000000000000000000111111101",
+                               endian="little"))
+
+def test_write_buffer_set_float_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+    wb.write_float(c_float(-1), 32)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00000000000000001000000010111111",
+                               endian="big"))