You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2023/05/11 03:58:55 UTC

[plc4x] branch develop updated: feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 99ed37c863 feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation
99ed37c863 is described below

commit 99ed37c86330a2ef0421d9ed81e918cce87ceb42
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Thu May 11 05:53:53 2023 +0200

    feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation
---
 sandbox/plc4py/plc4py/api/exceptions/exceptions.py |   4 +
 .../plc4py/plc4py/spi/generation/WriteBuffer.py    |  94 +++++++++++++---
 sandbox/plc4py/plc4py/spi/values/Common.py         |   7 ++
 sandbox/plc4py/plc4py/spi/values/common.py         |  23 ----
 sandbox/plc4py/plc4py/utils/GenericTypes.py        |   7 ++
 sandbox/plc4py/setup.py                            |   1 +
 .../tests/unit/plc4py/spi/test_write_buffer.py     | 118 +++++++++++++++++++++
 7 files changed, 217 insertions(+), 37 deletions(-)

diff --git a/sandbox/plc4py/plc4py/api/exceptions/exceptions.py b/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
index c42fabafa7..9082844959 100644
--- a/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
+++ b/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
@@ -33,3 +33,7 @@ class PlcFieldParseException(Exception):
 
 class PlcNotImplementedException(Exception):
     pass
+
+
+class SerializationException(Exception):
+    pass
diff --git a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
index 63d4d5969c..a04d71af51 100644
--- a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
+++ b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
@@ -19,7 +19,10 @@ from ctypes import *
 from dataclasses import dataclass
 from typing import List
 
-from plc4py.spi.values.common import Serializable
+from bitarray import bitarray
+from bitarray.util import zeros
+
+from plc4py.api.exceptions.exceptions import SerializationException
 from plc4py.utils.GenericTypes import ByteOrder, ByteOrderAware
 
 
@@ -46,42 +49,42 @@ class WriteBuffer(ByteOrderAware, PositionAware):
         raise NotImplementedError
 
     def write_byte(self, value: c_byte, logical_name: str = "", **kwargs) -> None:
-        self.write_signed_byte(value, logical_name, **kwargs)
+        self.write_signed_byte(value, 8, logical_name, **kwargs)
 
     def write_byte_array(self, value: List[c_byte], logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_unsigned_byte(self, value: c_ubyte, logical_name: str = "", **kwargs) -> None:
+    def write_unsigned_byte(self, value: c_ubyte, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_unsigned_short(self, value: c_uint16, logical_name: str = "", **kwargs) -> None:
+    def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_unsigned_int(self, value: c_uint32, logical_name: str = "", **kwargs) -> None:
+    def write_unsigned_int(self, value: c_uint32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_unsigned_long(self, value: c_uint64, logical_name: str = "", **kwargs) -> None:
+    def write_unsigned_long(self, value: c_uint64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_signed_byte(self, value: c_byte, logical_name: str = "", **kwargs) -> None:
+    def write_signed_byte(self, value: c_byte, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_short(self, value: c_int16, logical_name: str = "", **kwargs) -> None:
+    def write_short(self, value: c_int16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_int(self, value: c_int32, logical_name: str = "", **kwargs) -> None:
+    def write_int(self, value: c_int32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_long(self, value: c_int64, logical_name: str = "", **kwargs) -> None:
+    def write_long(self, value: c_int64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_float(self, value: c_float, logical_name: str = "", **kwargs) -> None:
+    def write_float(self, value: c_float, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_double(self, value: c_double, logical_name: str = "", **kwargs) -> None:
+    def write_double(self, value: c_double, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
-    def write_str(self, value: str, logical_name: str = "", bit_length: int = -1, **kwargs) -> None:
+    def write_str(self, value: str, bit_length: int = -1, logical_name: str = "", **kwargs) -> None:
         raise NotImplementedError
 
     def write_virtual(self, value: str, logical_name: str = "", **kwargs) -> None:
@@ -93,5 +96,68 @@ class WriteBuffer(ByteOrderAware, PositionAware):
     # @param value the value to be serialized
     # @throws SerializationException if something goes wrong
     #
-    def write_serializable(self, value: Serializable) -> None:
+    def write_serializable(self, value) -> None:
         value.serialize(self)
+
+
+class WriteBufferByteBased(WriteBuffer):
+    byte_order: ByteOrder
+    position: int = 0
+    bb: bitarray
+
+    def __init__(self, size: int, byte_order: ByteOrder):
+        self.bb = zeros(size * 8, endian=ByteOrder.get_short_name(byte_order))
+        self.byte_order = byte_order
+
+    def get_bytes(self) -> memoryview:
+        return memoryview(self.bb)
+
+    def get_pos(self) -> int:
+        return self.position
+
+    def push_context(self, logical_name: str, **kwargs) -> None:
+        # byte buffer need no context handling
+        pass
+
+    def write_bit(self, value: c_bool, logical_name: str = "", *kwargs) -> None:
+        self.bb[self.position] = bool(value)
+        self.position += 1
+
+    def write_byte(self, value: c_byte, logical_name: str = "", *kwargs) -> None:
+        self.write_signed_byte(value, logical_name, **kwargs)
+
+    def write_byte_array(self, value: List[c_byte], logical_name: str = "", *kwargs) -> None:
+        for aByte in value:
+            self.write_signed_byte(aByte, logical_name, **kwargs)
+
+    def write_unsigned_byte(self, value: c_byte, bit_length: int = 8, logical_name: str = "", *kwargs) -> None:
+        if bit_length <= 0:
+            raise SerializationException("unsigned byte can only contain max 8 bits")
+        elif bit_length > 8:
+            raise SerializationException("unsigned byte can only contain max 8 bits")
+        else:
+            src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
+            src.frombytes(value)
+            self.bb[self.position:bit_length] = src[:bit_length]
+            self.position += bit_length
+
+    # def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", *kwargs) -> None:
+    #     if bit_length <= 0:
+    #         raise SerializationException("unsigned short can only contain max 8 bits")
+    #     elif bit_length > 8:
+    #         raise SerializationException("unsigned short can only contain max 16 bits")
+    #     else:
+    #         self.bb.append(bytes(value))
+    #         value_encoding: str = kwargs.get("encoding", "default")
+    #         if value_encoding == "ASCII":
+    #             if bit_length % 8 != 0:
+    #                 raise SerializationException("'ASCII' encoded fields must have a length that is a multiple of 8 bits long")
+    #             char_len: int = int(bit_length / 8)
+    #             max_value: int = int(10**char_len - 1)
+    #             if value > max_value:
+    #                 raise SerializationException("Provided value of " + str(value) + " exceeds the max value of " + str(max_value))
+    #             string_value: str = "{}".format(value)
+    #             self.bb.append(string_value)
+    #         elif value_encoding == "default":
+    #             self.bb[self.position:bit_length] = value
+    #             self.postion += bit_length
diff --git a/sandbox/plc4py/plc4py/spi/values/Common.py b/sandbox/plc4py/plc4py/spi/values/Common.py
new file mode 100644
index 0000000000..77a7351cdf
--- /dev/null
+++ b/sandbox/plc4py/plc4py/spi/values/Common.py
@@ -0,0 +1,7 @@
+from plc4py.spi.generation import WriteBuffer
+
+
+class Serializable:
+
+    def serialize(self, write_buffer: WriteBuffer):
+        pass
\ No newline at end of file
diff --git a/sandbox/plc4py/plc4py/spi/values/common.py b/sandbox/plc4py/plc4py/spi/values/common.py
deleted file mode 100644
index c5ca06e1a6..0000000000
--- a/sandbox/plc4py/plc4py/spi/values/common.py
+++ /dev/null
@@ -1,23 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing,
-#  software distributed under the License is distributed on an
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-#  KIND, either express or implied.  See the License for the
-#  specific language governing permissions and limitations
-#  under the License.
-from plc4py.spi.generation.WriteBuffer import WriteBuffer
-
-
-class Serializable:
-
-    def serialize(self, write_buffer: WriteBuffer):
-        pass
diff --git a/sandbox/plc4py/plc4py/utils/GenericTypes.py b/sandbox/plc4py/plc4py/utils/GenericTypes.py
index 9b46d976dc..c1fb8ed5e1 100644
--- a/sandbox/plc4py/plc4py/utils/GenericTypes.py
+++ b/sandbox/plc4py/plc4py/utils/GenericTypes.py
@@ -49,6 +49,13 @@ class ByteOrder(Enum):
         obj._value_ = value
         return obj
 
+    @staticmethod
+    def get_short_name(order):
+        if order == ByteOrder.LITTLE_ENDIAN:
+            return "little"
+        elif order == ByteOrder.BIG_ENDIAN:
+            return "big"
+
 
 @dataclass
 class ByteOrderAware:
diff --git a/sandbox/plc4py/setup.py b/sandbox/plc4py/setup.py
index 6bc919296f..6543000684 100644
--- a/sandbox/plc4py/setup.py
+++ b/sandbox/plc4py/setup.py
@@ -44,6 +44,7 @@ setup(
         "black",
         "pip",
         "aenum",
+        "bitarray"
     ],
     extras_require={
         "dev": [
diff --git a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
new file mode 100644
index 0000000000..3a2284dc8a
--- /dev/null
+++ b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from ctypes import c_bool, c_byte
+
+import pytest
+from bitarray import bitarray
+
+from plc4py.spi.generation.WriteBuffer import WriteBufferByteBased
+
+from plc4py.utils.GenericTypes import ByteOrder
+
+
+def test_write_buffer_get_bytes(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(10, ByteOrder.LITTLE_ENDIAN)
+    ba: memoryview = wb.get_bytes()
+    assert(10 == ba.nbytes)
+
+
+def test_write_buffer_get_pos(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(10, ByteOrder.LITTLE_ENDIAN)
+    pos: int = wb.get_pos()
+    assert(0 == pos)
+
+
+def test_write_buffer_set_bit_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String")
+    ba: memoryview = wb.get_bytes()
+    assert(b'\x01' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String")
+    ba: memoryview = wb.get_bytes()
+    assert(b'\x80' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_get_pos(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String")
+    pos: int = wb.get_pos()
+    assert(1 == pos)
+
+
+def test_write_buffer_set_bit_x2_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String 1")
+    wb.write_bit(c_bool(True), "Test String 2")
+    ba: memoryview = wb.get_bytes()
+    assert (b'\x03' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_x2_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String 1")
+    wb.write_bit(c_bool(True), "Test String 2")
+    ba: memoryview = wb.get_bytes()
+    assert (b'\xc0' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_get_pos_dual(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_bit(c_bool(True), "Test String 1")
+    wb.write_bit(c_bool(True), "Test String 2")
+    pos: int = wb.get_pos()
+    assert(pos == 2)
+
+
+def test_write_buffer_set_unsigned_byte_get_pos(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_unsigned_byte(c_byte(0xFF), 8, "Test String 1")
+    pos: int = wb.get_pos()
+    assert(pos == 8)
+
+
+def test_write_buffer_set_unsigned_byte_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("01001000"))
+
+
+def test_write_buffer_set_unsigned_byte_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+    wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00010010"))
+
+
+def test_write_buffer_set_unsigned_byte_little_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+    wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("01000000"))
+
+
+def test_write_buffer_set_unsigned_byte_big_endian(mocker) -> None:
+    wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+    wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+    ba: memoryview = wb.get_bytes()
+    assert (ba.obj == bitarray("00010000"))
\ No newline at end of file