You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/20 06:03:17 UTC

[plc4x] branch develop updated (5894b08ef7 -> e5cf78485b)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 5894b08ef7 fix(plc4go/spi): fix concurrency issue when a executor is being started and stopped pretty fast
     new 16e91ad87f fix(plc4go/tools): licenser should now output the right file name
     new 62bc2ae77f refactor(plc4go/spi): abstract bufio.Reader through an interface
     new 14c59f0f4a fix(plc4go/spi): sync tcp.TransportInstance state change
     new e5cf78485b test(plc4go/cbus): set executor for browse test

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/ads/MessageCodec.go                |  10 +-
 plc4go/internal/cbus/Browser_test.go               |  14 ++
 plc4go/internal/cbus/MessageCodec.go               |  13 +-
 plc4go/spi/default/DefaultCodec.go                 |   1 +
 plc4go/spi/default/mock_TransportInstance_test.go  |  14 +-
 .../testutils/mock_TestTransportInstance_test.go   |  14 +-
 plc4go/spi/testutils/mock_WithOption_test.go       |  93 --------
 .../spi/transports/ExtendedReader.go               |  14 +-
 plc4go/spi/transports/TransportInstance.go         |   3 +-
 plc4go/spi/transports/mock_ExtendedReader_test.go  | 249 +++++++++++++++++++++
 .../spi/transports/mock_TransportInstance_test.go  |  13 +-
 plc4go/spi/transports/pcap/TransportInstance.go    |   3 +-
 plc4go/spi/transports/serial/TransportInstance.go  |   7 +-
 plc4go/spi/transports/tcp/TransportInstance.go     |  36 ++-
 .../spi/transports/tcp/TransportInstance_test.go   | 132 +++++------
 plc4go/spi/transports/test/TransportInstance.go    |   3 +-
 .../spi/transports/test/TransportInstance_test.go  |   9 +-
 plc4go/spi/transports/udp/TransportInstance.go     |   3 +-
 .../spi/transports/udp/TransportInstance_test.go   |   5 +-
 .../utils/DefaultBufferedTransportInstance.go      |   8 +-
 .../utils/DefaultBufferedTransportInstance_test.go |   8 +-
 ...ltBufferedTransportInstanceRequirements_test.go |  15 +-
 .../mock_DefaultBufferedTransportInstance_test.go  |  14 +-
 plc4go/tools/plc4xlicenser/gen.go                  |   2 +-
 24 files changed, 433 insertions(+), 250 deletions(-)
 delete mode 100644 plc4go/spi/testutils/mock_WithOption_test.go
 copy code-generation/protocol-base-mspec/src/main/java/org/apache/plc4x/plugins/codegenerator/language/mspec/model/references/DefaultUndefinedTypeReference.java => plc4go/spi/transports/ExtendedReader.go (74%)
 create mode 100644 plc4go/spi/transports/mock_ExtendedReader_test.go


[plc4x] 02/04: refactor(plc4go/spi): abstract bufio.Reader through an interface

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 62bc2ae77f8e416fe14caae9b0e588ce983b64e3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 20:41:57 2023 +0200

    refactor(plc4go/spi): abstract bufio.Reader through an interface
---
 plc4go/internal/ads/MessageCodec.go                |  10 +-
 plc4go/internal/cbus/MessageCodec.go               |  13 +-
 plc4go/spi/default/mock_TransportInstance_test.go  |  14 +-
 .../testutils/mock_TestTransportInstance_test.go   |  14 +-
 plc4go/spi/testutils/mock_WithOption_test.go       |  93 --------
 .../{TransportInstance.go => ExtendedReader.go}    |  29 +--
 plc4go/spi/transports/TransportInstance.go         |   3 +-
 plc4go/spi/transports/mock_ExtendedReader_test.go  | 249 +++++++++++++++++++++
 .../spi/transports/mock_TransportInstance_test.go  |  13 +-
 plc4go/spi/transports/pcap/TransportInstance.go    |   3 +-
 plc4go/spi/transports/serial/TransportInstance.go  |   7 +-
 plc4go/spi/transports/tcp/TransportInstance.go     |  11 +-
 .../spi/transports/tcp/TransportInstance_test.go   | 132 +++++------
 plc4go/spi/transports/test/TransportInstance.go    |   3 +-
 .../spi/transports/test/TransportInstance_test.go  |   9 +-
 plc4go/spi/transports/udp/TransportInstance.go     |   3 +-
 .../spi/transports/udp/TransportInstance_test.go   |   5 +-
 .../utils/DefaultBufferedTransportInstance.go      |   8 +-
 .../utils/DefaultBufferedTransportInstance_test.go |   8 +-
 ...ltBufferedTransportInstanceRequirements_test.go |  15 +-
 .../mock_DefaultBufferedTransportInstance_test.go  |  14 +-
 21 files changed, 397 insertions(+), 259 deletions(-)

diff --git a/plc4go/internal/ads/MessageCodec.go b/plc4go/internal/ads/MessageCodec.go
index 75f85416a4..e711f1d899 100644
--- a/plc4go/internal/ads/MessageCodec.go
+++ b/plc4go/internal/ads/MessageCodec.go
@@ -20,18 +20,18 @@
 package ads
 
 import (
-	"bufio"
 	"context"
 	"encoding/binary"
-	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/rs/zerolog"
 
 	"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 )
 
 type MessageCodec struct {
@@ -87,7 +87,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	transportInstance := m.GetTransportInstance()
 
 	if err := transportInstance.FillBuffer(
-		func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+		func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 			numBytesAvailable, err := transportInstance.GetNumBytesAvailableInBuffer()
 			if err != nil {
 				return false
@@ -110,7 +110,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 		packetSize := (uint32(data[5]) << 24) + (uint32(data[4]) << 16) + (uint32(data[3]) << 8) + (uint32(data[2])) + 6
 		if num < packetSize {
 			if err := transportInstance.FillBuffer(
-				func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					numBytesAvailable, err := transportInstance.GetNumBytesAvailableInBuffer()
 					if err != nil {
 						return false
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index e2856f57c2..7151da9dba 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -20,16 +20,16 @@
 package cbus
 
 import (
-	"bufio"
 	"context"
+	"sync"
+	"sync/atomic"
+	"time"
+
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
-	"sync"
-	"sync/atomic"
-	"time"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -129,10 +129,13 @@ func (m *MessageCodec) Send(message spi.Message) error {
 func (m *MessageCodec) Receive() (spi.Message, error) {
 	m.log.Trace().Msg("Receive")
 	ti := m.GetTransportInstance()
+	if !ti.IsConnected() {
+		return nil, errors.New("Transport instance not connected")
+	}
 	confirmation := false
 	// Fill the buffer
 	{
-		if err := ti.FillBuffer(func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+		if err := ti.FillBuffer(func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 			m.log.Trace().Uint8("byte", currentByte).Msg("current byte")
 			switch currentByte {
 			case
diff --git a/plc4go/spi/default/mock_TransportInstance_test.go b/plc4go/spi/default/mock_TransportInstance_test.go
index 93e23c116b..f331da8781 100644
--- a/plc4go/spi/default/mock_TransportInstance_test.go
+++ b/plc4go/spi/default/mock_TransportInstance_test.go
@@ -22,9 +22,9 @@
 package _default
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -166,11 +166,11 @@ func (_c *MockTransportInstance_ConnectWithContext_Call) RunAndReturn(run func(c
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -185,14 +185,14 @@ type MockTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockTransportInstance_Expecter) FillBuffer(until interface{}) *MockTransportInstance_FillBuffer_Call {
 	return &MockTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -202,7 +202,7 @@ func (_c *MockTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTranspor
 	return _c
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/testutils/mock_TestTransportInstance_test.go b/plc4go/spi/testutils/mock_TestTransportInstance_test.go
index 8c71e00a23..ed0975f6d3 100644
--- a/plc4go/spi/testutils/mock_TestTransportInstance_test.go
+++ b/plc4go/spi/testutils/mock_TestTransportInstance_test.go
@@ -22,9 +22,9 @@
 package testutils
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -210,11 +210,11 @@ func (_c *MockTestTransportInstance_DrainWriteBuffer_Call) RunAndReturn(run func
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTestTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTestTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -229,14 +229,14 @@ type MockTestTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockTestTransportInstance_Expecter) FillBuffer(until interface{}) *MockTestTransportInstance_FillBuffer_Call {
 	return &MockTestTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTestTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTestTransportInstance_FillBuffer_Call {
+func (_c *MockTestTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockTestTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -246,7 +246,7 @@ func (_c *MockTestTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTest
 	return _c
 }
 
-func (_c *MockTestTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTestTransportInstance_FillBuffer_Call {
+func (_c *MockTestTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockTestTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/testutils/mock_WithOption_test.go b/plc4go/spi/testutils/mock_WithOption_test.go
deleted file mode 100644
index a4c12463e2..0000000000
--- a/plc4go/spi/testutils/mock_WithOption_test.go
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.2. DO NOT EDIT.
-
-package testutils
-
-import mock "github.com/stretchr/testify/mock"
-
-// MockWithOption is an autogenerated mock type for the WithOption type
-type MockWithOption struct {
-	mock.Mock
-}
-
-type MockWithOption_Expecter struct {
-	mock *mock.Mock
-}
-
-func (_m *MockWithOption) EXPECT() *MockWithOption_Expecter {
-	return &MockWithOption_Expecter{mock: &_m.Mock}
-}
-
-// isOption provides a mock function with given fields:
-func (_m *MockWithOption) isOption() bool {
-	ret := _m.Called()
-
-	var r0 bool
-	if rf, ok := ret.Get(0).(func() bool); ok {
-		r0 = rf()
-	} else {
-		r0 = ret.Get(0).(bool)
-	}
-
-	return r0
-}
-
-// MockWithOption_isOption_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'isOption'
-type MockWithOption_isOption_Call struct {
-	*mock.Call
-}
-
-// isOption is a helper method to define mock.On call
-func (_e *MockWithOption_Expecter) isOption() *MockWithOption_isOption_Call {
-	return &MockWithOption_isOption_Call{Call: _e.mock.On("isOption")}
-}
-
-func (_c *MockWithOption_isOption_Call) Run(run func()) *MockWithOption_isOption_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run()
-	})
-	return _c
-}
-
-func (_c *MockWithOption_isOption_Call) Return(_a0 bool) *MockWithOption_isOption_Call {
-	_c.Call.Return(_a0)
-	return _c
-}
-
-func (_c *MockWithOption_isOption_Call) RunAndReturn(run func() bool) *MockWithOption_isOption_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-type mockConstructorTestingTNewMockWithOption interface {
-	mock.TestingT
-	Cleanup(func())
-}
-
-// NewMockWithOption creates a new instance of MockWithOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockWithOption(t mockConstructorTestingTNewMockWithOption) *MockWithOption {
-	mock := &MockWithOption{}
-	mock.Mock.Test(t)
-
-	t.Cleanup(func() { mock.AssertExpectations(t) })
-
-	return mock
-}
diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/ExtendedReader.go
similarity index 54%
copy from plc4go/spi/transports/TransportInstance.go
copy to plc4go/spi/transports/ExtendedReader.go
index e81d6821b9..bb9fe3e847 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/ExtendedReader.go
@@ -19,26 +19,13 @@
 
 package transports
 
-import (
-	"bufio"
-	"context"
-	"fmt"
-)
+import "io"
 
-type TransportInstance interface {
-	fmt.Stringer
-	Connect() error
-	ConnectWithContext(ctx context.Context) error
-	Close() error
-
-	IsConnected() bool
-
-	// FillBuffer fills the buffer `until` false (Useful in conjunction if you want GetNumBytesAvailableInBuffer)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
-	// GetNumBytesAvailableInBuffer returns the bytes currently available in buffer (!!!Careful: if you looking for a termination you have to use FillBuffer)
-	GetNumBytesAvailableInBuffer() (uint32, error)
-	PeekReadableBytes(numBytes uint32) ([]byte, error)
-	Read(numBytes uint32) ([]byte, error)
-
-	Write(data []byte) error
+type ExtendedReader interface {
+	io.Reader
+	io.ByteReader
+	// Peek returns the next n bytes without advancing the reader.
+	Peek(int) ([]byte, error)
+	// Buffered returns the number of bytes that can be read from the current buffer.
+	Buffered() int
 }
diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/TransportInstance.go
index e81d6821b9..ee121c3a87 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/TransportInstance.go
@@ -20,7 +20,6 @@
 package transports
 
 import (
-	"bufio"
 	"context"
 	"fmt"
 )
@@ -34,7 +33,7 @@ type TransportInstance interface {
 	IsConnected() bool
 
 	// FillBuffer fills the buffer `until` false (Useful in conjunction if you want GetNumBytesAvailableInBuffer)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
+	FillBuffer(until func(pos uint, currentByte byte, reader ExtendedReader) bool) error
 	// GetNumBytesAvailableInBuffer returns the bytes currently available in buffer (!!!Careful: if you looking for a termination you have to use FillBuffer)
 	GetNumBytesAvailableInBuffer() (uint32, error)
 	PeekReadableBytes(numBytes uint32) ([]byte, error)
diff --git a/plc4go/spi/transports/mock_ExtendedReader_test.go b/plc4go/spi/transports/mock_ExtendedReader_test.go
new file mode 100644
index 0000000000..9b9e291e77
--- /dev/null
+++ b/plc4go/spi/transports/mock_ExtendedReader_test.go
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.30.1. DO NOT EDIT.
+
+package transports
+
+import mock "github.com/stretchr/testify/mock"
+
+// MockExtendedReader is an autogenerated mock type for the ExtendedReader type
+type MockExtendedReader struct {
+	mock.Mock
+}
+
+type MockExtendedReader_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockExtendedReader) EXPECT() *MockExtendedReader_Expecter {
+	return &MockExtendedReader_Expecter{mock: &_m.Mock}
+}
+
+// Buffered provides a mock function with given fields:
+func (_m *MockExtendedReader) Buffered() int {
+	ret := _m.Called()
+
+	var r0 int
+	if rf, ok := ret.Get(0).(func() int); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(int)
+	}
+
+	return r0
+}
+
+// MockExtendedReader_Buffered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Buffered'
+type MockExtendedReader_Buffered_Call struct {
+	*mock.Call
+}
+
+// Buffered is a helper method to define mock.On call
+func (_e *MockExtendedReader_Expecter) Buffered() *MockExtendedReader_Buffered_Call {
+	return &MockExtendedReader_Buffered_Call{Call: _e.mock.On("Buffered")}
+}
+
+func (_c *MockExtendedReader_Buffered_Call) Run(run func()) *MockExtendedReader_Buffered_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Buffered_Call) Return(_a0 int) *MockExtendedReader_Buffered_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExtendedReader_Buffered_Call) RunAndReturn(run func() int) *MockExtendedReader_Buffered_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Peek provides a mock function with given fields: _a0
+func (_m *MockExtendedReader) Peek(_a0 int) ([]byte, error) {
+	ret := _m.Called(_a0)
+
+	var r0 []byte
+	var r1 error
+	if rf, ok := ret.Get(0).(func(int) ([]byte, error)); ok {
+		return rf(_a0)
+	}
+	if rf, ok := ret.Get(0).(func(int) []byte); ok {
+		r0 = rf(_a0)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]byte)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(int) error); ok {
+		r1 = rf(_a0)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_Peek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Peek'
+type MockExtendedReader_Peek_Call struct {
+	*mock.Call
+}
+
+// Peek is a helper method to define mock.On call
+//   - _a0 int
+func (_e *MockExtendedReader_Expecter) Peek(_a0 interface{}) *MockExtendedReader_Peek_Call {
+	return &MockExtendedReader_Peek_Call{Call: _e.mock.On("Peek", _a0)}
+}
+
+func (_c *MockExtendedReader_Peek_Call) Run(run func(_a0 int)) *MockExtendedReader_Peek_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int))
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Peek_Call) Return(_a0 []byte, _a1 error) *MockExtendedReader_Peek_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockExtendedReader_Peek_Call) RunAndReturn(run func(int) ([]byte, error)) *MockExtendedReader_Peek_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Read provides a mock function with given fields: p
+func (_m *MockExtendedReader) Read(p []byte) (int, error) {
+	ret := _m.Called(p)
+
+	var r0 int
+	var r1 error
+	if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
+		return rf(p)
+	}
+	if rf, ok := ret.Get(0).(func([]byte) int); ok {
+		r0 = rf(p)
+	} else {
+		r0 = ret.Get(0).(int)
+	}
+
+	if rf, ok := ret.Get(1).(func([]byte) error); ok {
+		r1 = rf(p)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read'
+type MockExtendedReader_Read_Call struct {
+	*mock.Call
+}
+
+// Read is a helper method to define mock.On call
+//   - p []byte
+func (_e *MockExtendedReader_Expecter) Read(p interface{}) *MockExtendedReader_Read_Call {
+	return &MockExtendedReader_Read_Call{Call: _e.mock.On("Read", p)}
+}
+
+func (_c *MockExtendedReader_Read_Call) Run(run func(p []byte)) *MockExtendedReader_Read_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].([]byte))
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Read_Call) Return(n int, err error) *MockExtendedReader_Read_Call {
+	_c.Call.Return(n, err)
+	return _c
+}
+
+func (_c *MockExtendedReader_Read_Call) RunAndReturn(run func([]byte) (int, error)) *MockExtendedReader_Read_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// ReadByte provides a mock function with given fields:
+func (_m *MockExtendedReader) ReadByte() (byte, error) {
+	ret := _m.Called()
+
+	var r0 byte
+	var r1 error
+	if rf, ok := ret.Get(0).(func() (byte, error)); ok {
+		return rf()
+	}
+	if rf, ok := ret.Get(0).(func() byte); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(byte)
+	}
+
+	if rf, ok := ret.Get(1).(func() error); ok {
+		r1 = rf()
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_ReadByte_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadByte'
+type MockExtendedReader_ReadByte_Call struct {
+	*mock.Call
+}
+
+// ReadByte is a helper method to define mock.On call
+func (_e *MockExtendedReader_Expecter) ReadByte() *MockExtendedReader_ReadByte_Call {
+	return &MockExtendedReader_ReadByte_Call{Call: _e.mock.On("ReadByte")}
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) Run(run func()) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) Return(_a0 byte, _a1 error) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) RunAndReturn(run func() (byte, error)) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// NewMockExtendedReader creates a new instance of MockExtendedReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockExtendedReader(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *MockExtendedReader {
+	mock := &MockExtendedReader{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/plc4go/spi/transports/mock_TransportInstance_test.go b/plc4go/spi/transports/mock_TransportInstance_test.go
index 2055e06b88..d85643393f 100644
--- a/plc4go/spi/transports/mock_TransportInstance_test.go
+++ b/plc4go/spi/transports/mock_TransportInstance_test.go
@@ -22,7 +22,6 @@
 package transports
 
 import (
-	bufio "bufio"
 	context "context"
 
 	mock "github.com/stretchr/testify/mock"
@@ -166,11 +165,11 @@ func (_c *MockTransportInstance_ConnectWithContext_Call) RunAndReturn(run func(c
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -185,14 +184,14 @@ type MockTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , ExtendedReader) bool
 func (_e *MockTransportInstance_Expecter) FillBuffer(until interface{}) *MockTransportInstance_FillBuffer_Call {
 	return &MockTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, ExtendedReader) bool)) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, ExtendedReader) bool))
 	})
 	return _c
 }
@@ -202,7 +201,7 @@ func (_c *MockTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTranspor
 	return _c
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, ExtendedReader) bool) error) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go b/plc4go/spi/transports/pcap/TransportInstance.go
index 30361c358e..20a0c7e9df 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -30,6 +30,7 @@ import (
 	"time"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/gopacket/gopacket"
@@ -169,7 +170,7 @@ func (m *TransportInstance) Write(_ []byte) error {
 	return errors.New("Write to pcap not supported")
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/serial/TransportInstance.go b/plc4go/spi/transports/serial/TransportInstance.go
index 514fc4ef01..33afeaf212 100644
--- a/plc4go/spi/transports/serial/TransportInstance.go
+++ b/plc4go/spi/transports/serial/TransportInstance.go
@@ -22,12 +22,15 @@ package serial
 import (
 	"bufio"
 	"fmt"
+	"io"
+
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
+
 	"github.com/jacobsa/go-serial/serial"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
-	"io"
 )
 
 type TransportInstance struct {
@@ -106,7 +109,7 @@ func (m *TransportInstance) Write(data []byte) error {
 	return nil
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go
index 2c060c076e..6fdf65b2c7 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -26,6 +26,7 @@ import (
 	"net"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/pkg/errors"
@@ -34,12 +35,14 @@ import (
 
 type TransportInstance struct {
 	transportUtils.DefaultBufferedTransportInstance
+
 	RemoteAddress  *net.TCPAddr
 	LocalAddress   *net.TCPAddr
 	ConnectTimeout uint32
-	transport      *Transport
-	tcpConn        net.Conn
-	reader         *bufio.Reader
+
+	transport *Transport
+	tcpConn   net.Conn
+	reader    *bufio.Reader
 
 	log zerolog.Logger
 }
@@ -108,7 +111,7 @@ func (m *TransportInstance) Write(data []byte) error {
 	return nil
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/tcp/TransportInstance_test.go b/plc4go/spi/transports/tcp/TransportInstance_test.go
index 7c929b69f4..64149d8220 100644
--- a/plc4go/spi/transports/tcp/TransportInstance_test.go
+++ b/plc4go/spi/transports/tcp/TransportInstance_test.go
@@ -20,11 +20,11 @@
 package tcp
 
 import (
-	"bufio"
 	"context"
 	"net"
 	"testing"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/stretchr/testify/assert"
@@ -69,43 +69,44 @@ func TestTransportInstance_Close(t *testing.T) {
 		ConnectTimeout                   uint32
 		transport                        *Transport
 		tcpConn                          net.Conn
-		reader                           *bufio.Reader
+		reader                           transports.ExtendedReader
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		wantErr bool
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, ti *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name: "close it (no conn)",
 		},
 		{
 			name: "close it (broken connection)",
-			fields: fields{
-				tcpConn: &net.TCPConn{},
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				var tcpConn net.Conn = &net.TCPConn{}
+				ti.tcpConn.Store(&tcpConn)
 			},
 			wantErr: true,
 		},
 		{
 			name: "close it",
-			fields: fields{
-				tcpConn: func() *net.TCPConn {
-					listener, err := nettest.NewLocalListener("tcp")
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, listener.Close())
-					})
-					go func() {
-						_, _ = listener.Accept()
-					}()
-					tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						// As we already closed the connection with the whole method this should error
-						assert.Error(t, tcp.Close())
-					})
-					return tcp
-				}(),
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				listener, err := nettest.NewLocalListener("tcp")
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, listener.Close())
+				})
+				go func() {
+					_, _ = listener.Accept()
+				}()
+				tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					// As we already closed the connection with the whole method this should error
+					assert.Error(t, tcp.Close())
+				})
+				var tcpConn net.Conn = tcp
+				ti.tcpConn.Store(&tcpConn)
 			},
 		},
 	}
@@ -117,8 +118,9 @@ func TestTransportInstance_Close(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
 			}
 			if err := m.Close(); (err != nil) != tt.wantErr {
 				t.Errorf("Close() error = %v, wantErr %v", err, tt.wantErr)
@@ -134,8 +136,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name    string
@@ -155,8 +155,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if err := m.Connect(); (err != nil) != tt.wantErr {
 				t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr)
@@ -172,8 +170,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	type args struct {
 		ctx context.Context
@@ -218,8 +214,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if err := m.ConnectWithContext(tt.args.ctx); (err != nil) != tt.wantErr {
 				t.Errorf("ConnectWithContext() error = %v, wantErr %v", err, tt.wantErr)
@@ -235,13 +229,11 @@ func TestTransportInstance_GetReader(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
 		fields fields
-		want   *bufio.Reader
+		want   transports.ExtendedReader
 	}{
 		{
 			name: "get it",
@@ -255,8 +247,6 @@ func TestTransportInstance_GetReader(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.GetReader(); !assert.Equal(t, tt.want, got) {
 				t.Errorf("GetReader() = %v, want %v", got, tt.want)
@@ -272,8 +262,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
@@ -292,8 +280,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.IsConnected(); got != tt.want {
 				t.Errorf("IsConnected() = %v, want %v", got, tt.want)
@@ -309,8 +295,6 @@ func TestTransportInstance_String(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
@@ -338,8 +322,6 @@ func TestTransportInstance_String(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.String(); got != tt.want {
 				t.Errorf("String() = %v, want %v", got, tt.want)
@@ -355,17 +337,16 @@ func TestTransportInstance_Write(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	type args struct {
 		data []byte
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, ti *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name:    "write it (failing)",
@@ -373,30 +354,30 @@ func TestTransportInstance_Write(t *testing.T) {
 		},
 		{
 			name: "write it (failing with con)",
-			fields: fields{
-				tcpConn: &net.TCPConn{},
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				var tcpConn net.Conn = &net.TCPConn{}
+				ti.tcpConn.Store(&tcpConn)
 			},
 			wantErr: true,
 		},
 		{
 			name: "write it",
-			fields: fields{
-				tcpConn: func() *net.TCPConn {
-					listener, err := nettest.NewLocalListener("tcp")
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, listener.Close())
-					})
-					go func() {
-						_, _ = listener.Accept()
-					}()
-					tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, tcp.Close())
-					})
-					return tcp
-				}(),
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				listener, err := nettest.NewLocalListener("tcp")
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, listener.Close())
+				})
+				go func() {
+					_, _ = listener.Accept()
+				}()
+				tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, tcp.Close())
+				})
+				var tcpConn net.Conn = tcp
+				ti.tcpConn.Store(&tcpConn)
 			},
 		},
 	}
@@ -408,8 +389,9 @@ func TestTransportInstance_Write(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
 			}
 			if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
 				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go
index 972a5f4b8c..b01d94ef3b 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -28,6 +28,7 @@ import (
 	"sync"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -98,7 +99,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 	return uint32(readableBytes), nil
 }
 
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	if !m.IsConnected() {
 		panic(errors.New("working on a unconnected connection"))
 	}
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go
index 36fdb95e97..78cbf318b3 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -20,10 +20,11 @@
 package test
 
 import (
-	"bufio"
 	"context"
 	"testing"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
+
 	"github.com/stretchr/testify/assert"
 )
 
@@ -205,7 +206,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name    string
@@ -219,7 +220,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				connected: true,
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 3
 				},
 			},
@@ -232,7 +233,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 3
 				},
 			},
diff --git a/plc4go/spi/transports/udp/TransportInstance.go b/plc4go/spi/transports/udp/TransportInstance.go
index 7be389f70f..c99a4a2780 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -26,6 +26,7 @@ import (
 	"net"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/libp2p/go-reuseport"
 	"github.com/pkg/errors"
@@ -133,7 +134,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 	return uint32(m.reader.Buffered()), nil
 }
 
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	nBytes := uint32(1)
 	for {
 		_bytes, err := m.PeekReadableBytes(nBytes)
diff --git a/plc4go/spi/transports/udp/TransportInstance_test.go b/plc4go/spi/transports/udp/TransportInstance_test.go
index c973797222..7424fa472b 100644
--- a/plc4go/spi/transports/udp/TransportInstance_test.go
+++ b/plc4go/spi/transports/udp/TransportInstance_test.go
@@ -23,6 +23,7 @@ import (
 	"bufio"
 	"bytes"
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	"net"
 	"testing"
 
@@ -299,7 +300,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		reader         *bufio.Reader
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name    string
@@ -317,7 +318,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 2
 				},
 			},
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index ac94e35625..6406964125 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -20,25 +20,25 @@
 package utils
 
 import (
-	"bufio"
 	"context"
 	"runtime/debug"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 )
 
 type DefaultBufferedTransportInstanceRequirements interface {
-	GetReader() *bufio.Reader
+	GetReader() transports.ExtendedReader
 	Connect() error
 }
 
 type DefaultBufferedTransportInstance interface {
 	ConnectWithContext(ctx context.Context) error
 	GetNumBytesAvailableInBuffer() (uint32, error)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
+	FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error
 	PeekReadableBytes(numBytes uint32) ([]byte, error)
 	Read(numBytes uint32) ([]byte, error)
 }
@@ -84,7 +84,7 @@ func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint3
 	return uint32(m.GetReader().Buffered()), nil
 }
 
-func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	if m.GetReader() == nil {
 		return nil
 	}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
index b9cfe9842b..9d13f28f32 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
@@ -26,6 +26,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
+
 	"github.com/stretchr/testify/assert"
 )
 
@@ -118,7 +120,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		DefaultBufferedTransportInstanceRequirements DefaultBufferedTransportInstanceRequirements
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name      string
@@ -138,7 +140,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		},
 		{
 			name: "fill it with reader",
-			args: args{func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+			args: args{func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 				return pos < 1
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
@@ -149,7 +151,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		},
 		{
 			name: "fill it with reader errors",
-			args: args{func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+			args: args{func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 				return pos < 2
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
index 0fe10042e2..c5bb927ebf 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
@@ -22,8 +22,7 @@
 package utils
 
 import (
-	bufio "bufio"
-
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -82,15 +81,15 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_Connect_Call) RunAndR
 }
 
 // GetReader provides a mock function with given fields:
-func (_m *MockDefaultBufferedTransportInstanceRequirements) GetReader() *bufio.Reader {
+func (_m *MockDefaultBufferedTransportInstanceRequirements) GetReader() transports.ExtendedReader {
 	ret := _m.Called()
 
-	var r0 *bufio.Reader
-	if rf, ok := ret.Get(0).(func() *bufio.Reader); ok {
+	var r0 transports.ExtendedReader
+	if rf, ok := ret.Get(0).(func() transports.ExtendedReader); ok {
 		r0 = rf()
 	} else {
 		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(*bufio.Reader)
+			r0 = ret.Get(0).(transports.ExtendedReader)
 		}
 	}
 
@@ -114,12 +113,12 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Run(r
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Return(_a0 *bufio.Reader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
+func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Return(_a0 transports.ExtendedReader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
 	_c.Call.Return(_a0)
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAndReturn(run func() *bufio.Reader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
+func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAndReturn(run func() transports.ExtendedReader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
index a89041df2a..1a30cadcb0 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
@@ -22,9 +22,9 @@
 package utils
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -84,11 +84,11 @@ func (_c *MockDefaultBufferedTransportInstance_ConnectWithContext_Call) RunAndRe
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockDefaultBufferedTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockDefaultBufferedTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -103,14 +103,14 @@ type MockDefaultBufferedTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockDefaultBufferedTransportInstance_Expecter) FillBuffer(until interface{}) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	return &MockDefaultBufferedTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
+func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -120,7 +120,7 @@ func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Return(_a0 error
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
+func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }


[plc4x] 01/04: fix(plc4go/tools): licenser should now output the right file name

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 16e91ad87fe7c05eecd98039398d68a5a637ada4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 19:30:23 2023 +0200

    fix(plc4go/tools): licenser should now output the right file name
---
 plc4go/tools/plc4xlicenser/gen.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/plc4go/tools/plc4xlicenser/gen.go b/plc4go/tools/plc4xlicenser/gen.go
index 9517c55bce..4aa44c68a1 100644
--- a/plc4go/tools/plc4xlicenser/gen.go
+++ b/plc4go/tools/plc4xlicenser/gen.go
@@ -118,7 +118,7 @@ func main() {
 	if err := os.WriteFile(outputName, append(licenceContent, inputFile...), 0644); err != nil {
 		log.Fatalf("writing output: %s", err)
 	}
-	fmt.Printf("Fixed plc4x license of %s\n", licenseFileNameWithPath)
+	fmt.Printf("Fixed plc4x license of %s\n", outputName)
 }
 
 // isDirectory reports whether the named file is a directory.


[plc4x] 04/04: test(plc4go/cbus): set executor for browse test

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit e5cf78485b3b6b2bffd91f596fa2f3ed64bec893
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 20 08:03:10 2023 +0200

    test(plc4go/cbus): set executor for browse test
---
 plc4go/internal/cbus/Browser_test.go | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 3fef9b7ee8..c1860d88e8 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -35,7 +35,9 @@ import (
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
+	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
@@ -88,6 +90,13 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
+				executor := pool.NewFixedSizeExecutor(10, 10, _options...)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				_options = append(_options,
+					transactions.WithCustomExecutor(executor),
+				)
+
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
@@ -246,6 +255,11 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
+				executor := pool.NewFixedSizeExecutor(10, 10, _options...)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				_options = append(_options, transactions.WithCustomExecutor(executor))
+
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)


[plc4x] 03/04: fix(plc4go/spi): sync tcp.TransportInstance state change

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 14c59f0f4ab5a636bd200b7d18fc655ba4f8132b
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 20 08:02:28 2023 +0200

    fix(plc4go/spi): sync tcp.TransportInstance state change
---
 plc4go/spi/default/DefaultCodec.go             |  1 +
 plc4go/spi/transports/tcp/TransportInstance.go | 29 +++++++++++++++++++-------
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index e542681a74..4c7b99bca2 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -168,6 +168,7 @@ func (m *defaultCodec) Disconnect() error {
 	m.activeWorker.Wait()
 	m.log.Trace().Msg("worker shut down")
 	if m.transportInstance != nil {
+		m.log.Trace().Msg("closing transport instance")
 		if err := m.transportInstance.Close(); err != nil {
 			return errors.Wrap(err, "error closing transport instance")
 		}
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go
index 6fdf65b2c7..3e5e7ff705 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -24,6 +24,8 @@ import (
 	"context"
 	"fmt"
 	"net"
+	"sync"
+	"sync/atomic"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
@@ -41,8 +43,12 @@ type TransportInstance struct {
 	ConnectTimeout uint32
 
 	transport *Transport
-	tcpConn   net.Conn
-	reader    *bufio.Reader
+
+	tcpConn net.Conn
+	reader  *bufio.Reader
+
+	connected        atomic.Bool
+	stateChangeMutex sync.RWMutex
 
 	log zerolog.Logger
 }
@@ -64,6 +70,11 @@ func (m *TransportInstance) Connect() error {
 }
 
 func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
+	if m.connected.Load() {
+		return errors.New("already connected")
+	}
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
 	if m.RemoteAddress == nil {
 		return errors.New("Required remote address missing")
 	}
@@ -78,27 +89,29 @@ func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
 
 	m.reader = bufio.NewReaderSize(m.tcpConn, 100000)
 
+	m.connected.Store(true)
 	return nil
 }
 
 func (m *TransportInstance) Close() error {
-	if m.tcpConn == nil {
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
+	if !m.connected.Load() {
 		return nil
 	}
-	err := m.tcpConn.Close()
-	if err != nil {
+	if err := m.tcpConn.Close(); err != nil {
 		return errors.Wrap(err, "error closing connection")
 	}
-	m.tcpConn = nil
+	m.connected.Store(false)
 	return nil
 }
 
 func (m *TransportInstance) IsConnected() bool {
-	return m.tcpConn != nil
+	return m.connected.Load()
 }
 
 func (m *TransportInstance) Write(data []byte) error {
-	if m.tcpConn == nil {
+	if !m.connected.Load() {
 		return errors.New("error writing to transport. No writer available")
 	}
 	num, err := m.tcpConn.Write(data)