You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2023/05/12 03:58:13 UTC

[plc4x] 02/02: feat(plc4py): Finish unsigned integer writes for ByteBuffer

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5ae793ea3c5c9aa6cf067d48816c0d36f88734c0
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Fri May 12 05:52:06 2023 +0200

    feat(plc4py): Finish unsigned integer writes for ByteBuffer
---
 .../plc4py/plc4py/spi/generation/WriteBuffer.py    | 68 ++++++++++------
 .../tests/unit/plc4py/spi/test_write_buffer.py     | 92 ++++++++++++++++------
 2 files changed, 112 insertions(+), 48 deletions(-)

diff --git a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
index 4a7ee42144..cb1d3cece1 100644
--- a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
+++ b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
@@ -14,8 +14,7 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
-
-from ctypes import *
+from ctypes import c_bool, c_byte, c_ubyte, c_uint16, c_uint32, c_uint64, c_int16, c_int32, c_int64, c_float, c_double
 from dataclasses import dataclass
 from typing import List
 
@@ -127,12 +126,12 @@ class WriteBufferByteBased(WriteBuffer):
         self.write_signed_byte(value, logical_name, **kwargs)
 
     def write_byte_array(self, value: List[c_byte], logical_name: str = "", **kwargs) -> None:
-        for aByte in value:
-            self.write_signed_byte(aByte, logical_name, **kwargs)
+        for a_byte in value:
+            self.write_signed_byte(a_byte, logical_name, **kwargs)
 
     def write_unsigned_byte(self, value: c_byte, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
         if bit_length <= 0:
-            raise SerializationException("unsigned byte can only contain max 8 bits")
+            raise SerializationException("unsigned byte must contain at least 1 bit")
         elif bit_length > 8:
             raise SerializationException("unsigned byte can only contain max 8 bits")
         else:
@@ -143,25 +142,46 @@ class WriteBufferByteBased(WriteBuffer):
 
     def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
         if bit_length <= 0:
-            raise SerializationException("unsigned short can only contain max 8 bits")
+            raise SerializationException("unsigned short must contain at least 1 bit")
         elif bit_length > 16:
             raise SerializationException("unsigned short can only contain max 16 bits")
         else:
-            value_encoding: str = kwargs.get("encoding", "default")
-            if value_encoding == "ASCII":
-                if bit_length % 8 != 0:
-                    raise SerializationException("'ASCII' encoded fields must have a length that is a multiple of 8 bits long")
-                char_len: int = int(bit_length / 8)
-                max_value: int = int(10**char_len - 1)
-                if value.value > max_value:
-                    raise SerializationException("Provided value of " + str(value) + " exceeds the max value of " + str(max_value))
-                string_value: str = "{}".format(value.value)
-                src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
-                src.frombytes(bytearray(string_value, value_encoding))
-                self.bb[self.position:bit_length] = src[:bit_length]
-                self.position += bit_length
-            elif value_encoding == "default":
-                src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
-                src.frombytes(value)
-                self.bb[self.position:bit_length] = src[:bit_length]
-                self.position += bit_length
+            self._handle_integer_encoding(c_uint16(value.value), bit_length, **kwargs)
+
+    def write_unsigned_int(self, value: c_uint32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("unsigned int must contain at least 1 bit")
+        elif bit_length > 32:
+            raise SerializationException("unsigned int can only contain max 32 bits")
+        else:
+            self._handle_integer_encoding(c_uint32(value.value), bit_length, **kwargs)
+
+    def write_unsigned_long(self, value: c_uint64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("unsigned long must contain at least 1 bit")
+        elif bit_length > 64:
+            raise SerializationException("unsigned long can only contain max 16 bits")
+        else:
+            self._handle_integer_encoding(c_uint64(value.value), bit_length, **kwargs)
+
+    def _handle_integer_encoding(self, value, bit_length: int, **kwargs):
+        value_encoding: str = kwargs.get("encoding", "default")
+        if value_encoding == "ASCII":
+            if bit_length % 8 != 0:
+                raise SerializationException(
+                    "'ASCII' encoded fields must have a length that is a multiple of 8 bits long")
+            char_len: int = int(bit_length / 8)
+            max_value: int = int(10 ** char_len - 1)
+            if value.value > max_value:
+                raise SerializationException(
+                    "Provided value of " + str(value) + " exceeds the max value of " + str(max_value))
+            string_value: str = "{}".format(value.value)
+            src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
+            src.frombytes(bytearray(string_value, value_encoding))
+            self.bb[self.position:bit_length] = src[:bit_length]
+            self.position += bit_length
+        elif value_encoding == "default":
+            src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
+            src.frombytes(value)
+            self.bb[self.position:bit_length] = src[:bit_length]
+            self.position += bit_length
diff --git a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
index c0acae12ee..17d8479374 100644
--- a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
+++ b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from ctypes import c_bool, c_byte, c_uint16
+from ctypes import c_bool, c_byte, c_uint16, c_uint64
 
 import pytest
 from bitarray import bitarray
@@ -40,94 +40,94 @@ def test_write_buffer_get_pos(mocker) -> None:
 
 def test_write_buffer_set_bit_little_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String")
+    wb.write_bit(c_bool(True))
     ba: memoryview = wb.get_bytes()
     assert(b'\x01' == ba.tobytes())
 
 
 def test_write_buffer_set_bit_big_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String")
+    wb.write_bit(c_bool(True))
     ba: memoryview = wb.get_bytes()
     assert(b'\x80' == ba.tobytes())
 
 
 def test_write_buffer_set_bit_get_pos(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String")
+    wb.write_bit(c_bool(True))
     pos: int = wb.get_pos()
     assert(1 == pos)
 
 
 def test_write_buffer_set_bit_x2_little_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String 1")
-    wb.write_bit(c_bool(True), "Test String 2")
+    wb.write_bit(c_bool(True))
+    wb.write_bit(c_bool(True))
     ba: memoryview = wb.get_bytes()
     assert (b'\x03' == ba.tobytes())
 
 
 def test_write_buffer_set_bit_x2_big_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String 1")
-    wb.write_bit(c_bool(True), "Test String 2")
+    wb.write_bit(c_bool(True))
+    wb.write_bit(c_bool(True))
     ba: memoryview = wb.get_bytes()
     assert (b'\xc0' == ba.tobytes())
 
 
 def test_write_buffer_set_bit_get_pos_dual(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String 1")
-    wb.write_bit(c_bool(True), "Test String 2")
+    wb.write_bit(c_bool(True))
+    wb.write_bit(c_bool(True))
     pos: int = wb.get_pos()
     assert(pos == 2)
 
 
 def test_write_buffer_set_unsigned_byte_get_pos(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_unsigned_byte(c_byte(0xFF), 8, "Test String 1")
+    wb.write_unsigned_byte(c_byte(0xFF), 8)
     pos: int = wb.get_pos()
     assert(pos == 8)
 
 
 def test_write_buffer_set_unsigned_byte_little_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+    wb.write_unsigned_byte(c_byte(0x12), 8)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("01001000"))
 
 
 def test_write_buffer_set_unsigned_byte_big_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
-    wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+    wb.write_unsigned_byte(c_byte(0x12), 8)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("00010010"))
 
 
 def test_write_buffer_set_unsigned_byte_little_endian_niblet(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
-    wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+    wb.write_unsigned_byte(c_byte(0x12), 4)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("01000000"))
 
 
 def test_write_buffer_set_unsigned_byte_big_endian_niblet(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
-    wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+    wb.write_unsigned_byte(c_byte(0x12), 4)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("00010000"))
 
 
 def test_write_buffer_write_unsigned_short_little_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.LITTLE_ENDIAN)
-    wb.write_unsigned_short(c_uint16(0x12), 16, "Test String 1")
+    wb.write_unsigned_short(c_uint16(0x12), 16)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("01001000 00000000", endian="little"))
 
 
 def test_write_buffer_write_unsigned_short_big_endian(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
-    wb.write_unsigned_short(c_uint16(0x12), 16, "Test String 1")
+    wb.write_unsigned_short(c_uint16(0x12), 16)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("00010010 00000000", endian="big"))
 
@@ -142,24 +142,24 @@ def test_write_buffer_write_unsigned_short_little_endian_dual(mocker) -> None:
 
 def test_write_buffer_write_unsigned_short_big_endian_dual(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
-    wb.write_unsigned_short(c_uint16(0x12), 16, "Test String 1")
-    wb.write_unsigned_short(c_uint16(0x34), 16, "Test String 1")
+    wb.write_unsigned_short(c_uint16(0x12), 16)
+    wb.write_unsigned_short(c_uint16(0x34), 16)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("00010010 00000000 00110100 00000000", endian="big"))
 
 
 def test_write_buffer_write_unsigned_short_big_endian_full(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
-    wb.write_unsigned_short(c_uint16(-1), 16, "Test String 1")
+    wb.write_unsigned_short(c_uint16(-1), 16)
     ba: memoryview = wb.get_bytes()
-    assert (ba.obj == bitarray("11111111 11111111", endian="bit"))
+    assert (ba.obj == bitarray("11111111 11111111", endian="big"))
 
 
 def test_write_buffer_write_unsigned_short_bit_big_endian_full(mocker) -> None:
     wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
-    wb.write_bit(c_bool(True), "Test String 1")
-    wb.write_bit(c_bool(False), "Test String 1")
-    wb.write_unsigned_short(c_uint16(-1), 16, "Test String 1")
+    wb.write_bit(c_bool(True))
+    wb.write_bit(c_bool(False))
+    wb.write_unsigned_short(c_uint16(-1), 16)
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("10 11111111 11111111", endian="big"))
 
@@ -176,3 +176,47 @@ def test_write_buffer_write_unsigned_short_ascii_encoding_big_endian(mocker) ->
     wb.write_unsigned_short(c_uint16(1), 16, "ASCII Value of 1 - 0x31", encoding="ASCII")
     ba: memoryview = wb.get_bytes()
     assert (ba.obj == bitarray("00110001", endian="big"))
+
+
+def test_write_buffer_write_unsigned_int_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.LITTLE_ENDIAN)
+    # Testing incompatible format being sent to it.
+    wb.write_unsigned_int(c_uint16(0x12), 32)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("01001000 00000000 00000000 00000000", endian="little"))
+
+
+def test_write_buffer_write_unsigned_int_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
+    # Testing incompatible format being sent to it.
+    wb.write_unsigned_int(c_uint16(0x12), 32)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00010010 00000000 00000000 00000000", endian="big"))
+
+
+def test_write_buffer_write_unsigned_long_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.LITTLE_ENDIAN)
+    wb.write_unsigned_long(c_uint64(0x12), 64)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("01001000 00000000 00000000 00000000 00000000 00000000 00000000 00000000", endian="little"))
+
+
+def test_write_buffer_write_unsigned_long_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(2, ByteOrder.BIG_ENDIAN)
+    wb.write_unsigned_long(c_uint64(0x12), 64)
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00010010 00000000 00000000 00000000 00000000 00000000 00000000 00000000", endian="big"))
+
+
+def test_write_buffer_write_unsigned_long_ascii_encoding_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(8, ByteOrder.LITTLE_ENDIAN)
+    wb.write_unsigned_long(c_uint64(11111111), 64, "ASCII Value of 1111 1111 - 0x3131313131313131", encoding="ASCII")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("10001100 10001100 10001100 10001100 10001100 10001100 10001100 10001100", endian="little"))
+
+
+def test_write_buffer_write_unsigned_short_ascii_encoding_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(8, ByteOrder.BIG_ENDIAN)
+    wb.write_unsigned_long(c_uint64(11111111), 64, "ASCII Value of 1111 1111 - 0x3131313131313131", encoding="ASCII")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00110001 00110001 00110001 00110001 00110001 00110001 00110001 00110001", endian="big"))