You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2023/05/11 03:58:55 UTC
[plc4x] branch develop updated: feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation
This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 99ed37c863 feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation
99ed37c863 is described below
commit 99ed37c86330a2ef0421d9ed81e918cce87ceb42
Author: Ben Hutcheson <be...@gmail.com>
AuthorDate: Thu May 11 05:53:53 2023 +0200
feat(plc4py): WriteBuffer interface and WriteBufferByteBased implementation
---
sandbox/plc4py/plc4py/api/exceptions/exceptions.py | 4 +
.../plc4py/plc4py/spi/generation/WriteBuffer.py | 94 +++++++++++++---
sandbox/plc4py/plc4py/spi/values/Common.py | 7 ++
sandbox/plc4py/plc4py/spi/values/common.py | 23 ----
sandbox/plc4py/plc4py/utils/GenericTypes.py | 7 ++
sandbox/plc4py/setup.py | 1 +
.../tests/unit/plc4py/spi/test_write_buffer.py | 118 +++++++++++++++++++++
7 files changed, 217 insertions(+), 37 deletions(-)
diff --git a/sandbox/plc4py/plc4py/api/exceptions/exceptions.py b/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
index c42fabafa7..9082844959 100644
--- a/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
+++ b/sandbox/plc4py/plc4py/api/exceptions/exceptions.py
@@ -33,3 +33,7 @@ class PlcFieldParseException(Exception):
class PlcNotImplementedException(Exception):
pass
+
+
+class SerializationException(Exception):
+ pass
diff --git a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
index 63d4d5969c..a04d71af51 100644
--- a/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
+++ b/sandbox/plc4py/plc4py/spi/generation/WriteBuffer.py
@@ -19,7 +19,10 @@ from ctypes import *
from dataclasses import dataclass
from typing import List
-from plc4py.spi.values.common import Serializable
+from bitarray import bitarray
+from bitarray.util import zeros
+
+from plc4py.api.exceptions.exceptions import SerializationException
from plc4py.utils.GenericTypes import ByteOrder, ByteOrderAware
@@ -46,42 +49,42 @@ class WriteBuffer(ByteOrderAware, PositionAware):
raise NotImplementedError
def write_byte(self, value: c_byte, logical_name: str = "", **kwargs) -> None:
- self.write_signed_byte(value, logical_name, **kwargs)
+ self.write_signed_byte(value, 8, logical_name, **kwargs)
def write_byte_array(self, value: List[c_byte], logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_unsigned_byte(self, value: c_ubyte, logical_name: str = "", **kwargs) -> None:
+ def write_unsigned_byte(self, value: c_ubyte, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_unsigned_short(self, value: c_uint16, logical_name: str = "", **kwargs) -> None:
+ def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_unsigned_int(self, value: c_uint32, logical_name: str = "", **kwargs) -> None:
+ def write_unsigned_int(self, value: c_uint32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_unsigned_long(self, value: c_uint64, logical_name: str = "", **kwargs) -> None:
+ def write_unsigned_long(self, value: c_uint64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_signed_byte(self, value: c_byte, logical_name: str = "", **kwargs) -> None:
+ def write_signed_byte(self, value: c_byte, bit_length: int = 8, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_short(self, value: c_int16, logical_name: str = "", **kwargs) -> None:
+ def write_short(self, value: c_int16, bit_length: int = 16, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_int(self, value: c_int32, logical_name: str = "", **kwargs) -> None:
+ def write_int(self, value: c_int32, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_long(self, value: c_int64, logical_name: str = "", **kwargs) -> None:
+ def write_long(self, value: c_int64, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_float(self, value: c_float, logical_name: str = "", **kwargs) -> None:
+ def write_float(self, value: c_float, bit_length: int = 32, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_double(self, value: c_double, logical_name: str = "", **kwargs) -> None:
+ def write_double(self, value: c_double, bit_length: int = 64, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
- def write_str(self, value: str, logical_name: str = "", bit_length: int = -1, **kwargs) -> None:
+ def write_str(self, value: str, bit_length: int = -1, logical_name: str = "", **kwargs) -> None:
raise NotImplementedError
def write_virtual(self, value: str, logical_name: str = "", **kwargs) -> None:
@@ -93,5 +96,68 @@ class WriteBuffer(ByteOrderAware, PositionAware):
# @param value the value to be serialized
# @throws SerializationException if something goes wrong
#
- def write_serializable(self, value: Serializable) -> None:
+ def write_serializable(self, value) -> None:
value.serialize(self)
+
+
+class WriteBufferByteBased(WriteBuffer):
+ byte_order: ByteOrder
+ position: int = 0
+ bb: bitarray
+
+ def __init__(self, size: int, byte_order: ByteOrder):
+ self.bb = zeros(size * 8, endian=ByteOrder.get_short_name(byte_order))
+ self.byte_order = byte_order
+
+ def get_bytes(self) -> memoryview:
+ return memoryview(self.bb)
+
+ def get_pos(self) -> int:
+ return self.position
+
+ def push_context(self, logical_name: str, **kwargs) -> None:
+ # byte buffer need no context handling
+ pass
+
+ def write_bit(self, value: c_bool, logical_name: str = "", *kwargs) -> None:
+ self.bb[self.position] = bool(value)
+ self.position += 1
+
+ def write_byte(self, value: c_byte, logical_name: str = "", *kwargs) -> None:
+ self.write_signed_byte(value, logical_name, **kwargs)
+
+ def write_byte_array(self, value: List[c_byte], logical_name: str = "", *kwargs) -> None:
+ for aByte in value:
+ self.write_signed_byte(aByte, logical_name, **kwargs)
+
+ def write_unsigned_byte(self, value: c_byte, bit_length: int = 8, logical_name: str = "", *kwargs) -> None:
+ if bit_length <= 0:
+ raise SerializationException("unsigned byte can only contain max 8 bits")
+ elif bit_length > 8:
+ raise SerializationException("unsigned byte can only contain max 8 bits")
+ else:
+ src = bitarray(endian=ByteOrder.get_short_name(self.byte_order))
+ src.frombytes(value)
+ self.bb[self.position:bit_length] = src[:bit_length]
+ self.position += bit_length
+
+ # def write_unsigned_short(self, value: c_uint16, bit_length: int = 16, logical_name: str = "", *kwargs) -> None:
+ # if bit_length <= 0:
+ # raise SerializationException("unsigned short can only contain max 8 bits")
+ # elif bit_length > 8:
+ # raise SerializationException("unsigned short can only contain max 16 bits")
+ # else:
+ # self.bb.append(bytes(value))
+ # value_encoding: str = kwargs.get("encoding", "default")
+ # if value_encoding == "ASCII":
+ # if bit_length % 8 != 0:
+ # raise SerializationException("'ASCII' encoded fields must have a length that is a multiple of 8 bits long")
+ # char_len: int = int(bit_length / 8)
+ # max_value: int = int(10**char_len - 1)
+ # if value > max_value:
+ # raise SerializationException("Provided value of " + str(value) + " exceeds the max value of " + str(max_value))
+ # string_value: str = "{}".format(value)
+ # self.bb.append(string_value)
+ # elif value_encoding == "default":
+ # self.bb[self.position:bit_length] = value
+ # self.postion += bit_length
diff --git a/sandbox/plc4py/plc4py/spi/values/Common.py b/sandbox/plc4py/plc4py/spi/values/Common.py
new file mode 100644
index 0000000000..77a7351cdf
--- /dev/null
+++ b/sandbox/plc4py/plc4py/spi/values/Common.py
@@ -0,0 +1,7 @@
+from plc4py.spi.generation import WriteBuffer
+
+
+class Serializable:
+
+ def serialize(self, write_buffer: WriteBuffer):
+ pass
\ No newline at end of file
diff --git a/sandbox/plc4py/plc4py/spi/values/common.py b/sandbox/plc4py/plc4py/spi/values/common.py
deleted file mode 100644
index c5ca06e1a6..0000000000
--- a/sandbox/plc4py/plc4py/spi/values/common.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from plc4py.spi.generation.WriteBuffer import WriteBuffer
-
-
-class Serializable:
-
- def serialize(self, write_buffer: WriteBuffer):
- pass
diff --git a/sandbox/plc4py/plc4py/utils/GenericTypes.py b/sandbox/plc4py/plc4py/utils/GenericTypes.py
index 9b46d976dc..c1fb8ed5e1 100644
--- a/sandbox/plc4py/plc4py/utils/GenericTypes.py
+++ b/sandbox/plc4py/plc4py/utils/GenericTypes.py
@@ -49,6 +49,13 @@ class ByteOrder(Enum):
obj._value_ = value
return obj
+ @staticmethod
+ def get_short_name(order):
+ if order == ByteOrder.LITTLE_ENDIAN:
+ return "little"
+ elif order == ByteOrder.BIG_ENDIAN:
+ return "big"
+
@dataclass
class ByteOrderAware:
diff --git a/sandbox/plc4py/setup.py b/sandbox/plc4py/setup.py
index 6bc919296f..6543000684 100644
--- a/sandbox/plc4py/setup.py
+++ b/sandbox/plc4py/setup.py
@@ -44,6 +44,7 @@ setup(
"black",
"pip",
"aenum",
+ "bitarray"
],
extras_require={
"dev": [
diff --git a/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
new file mode 100644
index 0000000000..3a2284dc8a
--- /dev/null
+++ b/sandbox/plc4py/tests/unit/plc4py/spi/test_write_buffer.py
@@ -0,0 +1,118 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from ctypes import c_bool, c_byte
+
+import pytest
+from bitarray import bitarray
+
+from plc4py.spi.generation.WriteBuffer import WriteBufferByteBased
+
+from plc4py.utils.GenericTypes import ByteOrder
+
+
+def test_write_buffer_get_bytes(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(10, ByteOrder.LITTLE_ENDIAN)
+ ba: memoryview = wb.get_bytes()
+ assert(10 == ba.nbytes)
+
+
+def test_write_buffer_get_pos(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(10, ByteOrder.LITTLE_ENDIAN)
+ pos: int = wb.get_pos()
+ assert(0 == pos)
+
+
+def test_write_buffer_set_bit_little_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String")
+ ba: memoryview = wb.get_bytes()
+ assert(b'\x01' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_big_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String")
+ ba: memoryview = wb.get_bytes()
+ assert(b'\x80' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_get_pos(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String")
+ pos: int = wb.get_pos()
+ assert(1 == pos)
+
+
+def test_write_buffer_set_bit_x2_little_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String 1")
+ wb.write_bit(c_bool(True), "Test String 2")
+ ba: memoryview = wb.get_bytes()
+ assert (b'\x03' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_x2_big_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String 1")
+ wb.write_bit(c_bool(True), "Test String 2")
+ ba: memoryview = wb.get_bytes()
+ assert (b'\xc0' == ba.tobytes())
+
+
+def test_write_buffer_set_bit_get_pos_dual(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_bit(c_bool(True), "Test String 1")
+ wb.write_bit(c_bool(True), "Test String 2")
+ pos: int = wb.get_pos()
+ assert(pos == 2)
+
+
+def test_write_buffer_set_unsigned_byte_get_pos(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_unsigned_byte(c_byte(0xFF), 8, "Test String 1")
+ pos: int = wb.get_pos()
+ assert(pos == 8)
+
+
+def test_write_buffer_set_unsigned_byte_little_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+ ba: memoryview = wb.get_bytes()
+ assert (ba.obj == bitarray("01001000"))
+
+
+def test_write_buffer_set_unsigned_byte_big_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+ wb.write_unsigned_byte(c_byte(0x12), 8, "Test String 1")
+ ba: memoryview = wb.get_bytes()
+ assert (ba.obj == bitarray("00010010"))
+
+
+def test_write_buffer_set_unsigned_byte_little_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.LITTLE_ENDIAN)
+ wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+ ba: memoryview = wb.get_bytes()
+ assert (ba.obj == bitarray("01000000"))
+
+
+def test_write_buffer_set_unsigned_byte_big_endian(mocker) -> None:
+ wb: WriteBufferByteBased = WriteBufferByteBased(1, ByteOrder.BIG_ENDIAN)
+ wb.write_unsigned_byte(c_byte(0x12), 4, "Test String 1")
+ ba: memoryview = wb.get_bytes()
+ assert (ba.obj == bitarray("00010000"))
\ No newline at end of file