You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/20 06:03:19 UTC

[plc4x] 02/04: refactor(plc4go/spi): abstract bufio.Reader through an interface

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 62bc2ae77f8e416fe14caae9b0e588ce983b64e3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 20:41:57 2023 +0200

    refactor(plc4go/spi): abstract bufio.Reader through an interface
---
 plc4go/internal/ads/MessageCodec.go                |  10 +-
 plc4go/internal/cbus/MessageCodec.go               |  13 +-
 plc4go/spi/default/mock_TransportInstance_test.go  |  14 +-
 .../testutils/mock_TestTransportInstance_test.go   |  14 +-
 plc4go/spi/testutils/mock_WithOption_test.go       |  93 --------
 .../{TransportInstance.go => ExtendedReader.go}    |  29 +--
 plc4go/spi/transports/TransportInstance.go         |   3 +-
 plc4go/spi/transports/mock_ExtendedReader_test.go  | 249 +++++++++++++++++++++
 .../spi/transports/mock_TransportInstance_test.go  |  13 +-
 plc4go/spi/transports/pcap/TransportInstance.go    |   3 +-
 plc4go/spi/transports/serial/TransportInstance.go  |   7 +-
 plc4go/spi/transports/tcp/TransportInstance.go     |  11 +-
 .../spi/transports/tcp/TransportInstance_test.go   | 132 +++++------
 plc4go/spi/transports/test/TransportInstance.go    |   3 +-
 .../spi/transports/test/TransportInstance_test.go  |   9 +-
 plc4go/spi/transports/udp/TransportInstance.go     |   3 +-
 .../spi/transports/udp/TransportInstance_test.go   |   5 +-
 .../utils/DefaultBufferedTransportInstance.go      |   8 +-
 .../utils/DefaultBufferedTransportInstance_test.go |   8 +-
 ...ltBufferedTransportInstanceRequirements_test.go |  15 +-
 .../mock_DefaultBufferedTransportInstance_test.go  |  14 +-
 21 files changed, 397 insertions(+), 259 deletions(-)

diff --git a/plc4go/internal/ads/MessageCodec.go b/plc4go/internal/ads/MessageCodec.go
index 75f85416a4..e711f1d899 100644
--- a/plc4go/internal/ads/MessageCodec.go
+++ b/plc4go/internal/ads/MessageCodec.go
@@ -20,18 +20,18 @@
 package ads
 
 import (
-	"bufio"
 	"context"
 	"encoding/binary"
-	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/rs/zerolog"
 
 	"github.com/apache/plc4x/plc4go/protocols/ads/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 )
 
 type MessageCodec struct {
@@ -87,7 +87,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	transportInstance := m.GetTransportInstance()
 
 	if err := transportInstance.FillBuffer(
-		func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+		func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 			numBytesAvailable, err := transportInstance.GetNumBytesAvailableInBuffer()
 			if err != nil {
 				return false
@@ -110,7 +110,7 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 		packetSize := (uint32(data[5]) << 24) + (uint32(data[4]) << 16) + (uint32(data[3]) << 8) + (uint32(data[2])) + 6
 		if num < packetSize {
 			if err := transportInstance.FillBuffer(
-				func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					numBytesAvailable, err := transportInstance.GetNumBytesAvailableInBuffer()
 					if err != nil {
 						return false
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index e2856f57c2..7151da9dba 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -20,16 +20,16 @@
 package cbus
 
 import (
-	"bufio"
 	"context"
+	"sync"
+	"sync/atomic"
+	"time"
+
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
-	"sync"
-	"sync/atomic"
-	"time"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -129,10 +129,13 @@ func (m *MessageCodec) Send(message spi.Message) error {
 func (m *MessageCodec) Receive() (spi.Message, error) {
 	m.log.Trace().Msg("Receive")
 	ti := m.GetTransportInstance()
+	if !ti.IsConnected() {
+		return nil, errors.New("Transport instance not connected")
+	}
 	confirmation := false
 	// Fill the buffer
 	{
-		if err := ti.FillBuffer(func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+		if err := ti.FillBuffer(func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 			m.log.Trace().Uint8("byte", currentByte).Msg("current byte")
 			switch currentByte {
 			case
diff --git a/plc4go/spi/default/mock_TransportInstance_test.go b/plc4go/spi/default/mock_TransportInstance_test.go
index 93e23c116b..f331da8781 100644
--- a/plc4go/spi/default/mock_TransportInstance_test.go
+++ b/plc4go/spi/default/mock_TransportInstance_test.go
@@ -22,9 +22,9 @@
 package _default
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -166,11 +166,11 @@ func (_c *MockTransportInstance_ConnectWithContext_Call) RunAndReturn(run func(c
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -185,14 +185,14 @@ type MockTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockTransportInstance_Expecter) FillBuffer(until interface{}) *MockTransportInstance_FillBuffer_Call {
 	return &MockTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -202,7 +202,7 @@ func (_c *MockTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTranspor
 	return _c
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/testutils/mock_TestTransportInstance_test.go b/plc4go/spi/testutils/mock_TestTransportInstance_test.go
index 8c71e00a23..ed0975f6d3 100644
--- a/plc4go/spi/testutils/mock_TestTransportInstance_test.go
+++ b/plc4go/spi/testutils/mock_TestTransportInstance_test.go
@@ -22,9 +22,9 @@
 package testutils
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -210,11 +210,11 @@ func (_c *MockTestTransportInstance_DrainWriteBuffer_Call) RunAndReturn(run func
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTestTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTestTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -229,14 +229,14 @@ type MockTestTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockTestTransportInstance_Expecter) FillBuffer(until interface{}) *MockTestTransportInstance_FillBuffer_Call {
 	return &MockTestTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTestTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTestTransportInstance_FillBuffer_Call {
+func (_c *MockTestTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockTestTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -246,7 +246,7 @@ func (_c *MockTestTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTest
 	return _c
 }
 
-func (_c *MockTestTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTestTransportInstance_FillBuffer_Call {
+func (_c *MockTestTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockTestTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/testutils/mock_WithOption_test.go b/plc4go/spi/testutils/mock_WithOption_test.go
deleted file mode 100644
index a4c12463e2..0000000000
--- a/plc4go/spi/testutils/mock_WithOption_test.go
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.2. DO NOT EDIT.
-
-package testutils
-
-import mock "github.com/stretchr/testify/mock"
-
-// MockWithOption is an autogenerated mock type for the WithOption type
-type MockWithOption struct {
-	mock.Mock
-}
-
-type MockWithOption_Expecter struct {
-	mock *mock.Mock
-}
-
-func (_m *MockWithOption) EXPECT() *MockWithOption_Expecter {
-	return &MockWithOption_Expecter{mock: &_m.Mock}
-}
-
-// isOption provides a mock function with given fields:
-func (_m *MockWithOption) isOption() bool {
-	ret := _m.Called()
-
-	var r0 bool
-	if rf, ok := ret.Get(0).(func() bool); ok {
-		r0 = rf()
-	} else {
-		r0 = ret.Get(0).(bool)
-	}
-
-	return r0
-}
-
-// MockWithOption_isOption_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'isOption'
-type MockWithOption_isOption_Call struct {
-	*mock.Call
-}
-
-// isOption is a helper method to define mock.On call
-func (_e *MockWithOption_Expecter) isOption() *MockWithOption_isOption_Call {
-	return &MockWithOption_isOption_Call{Call: _e.mock.On("isOption")}
-}
-
-func (_c *MockWithOption_isOption_Call) Run(run func()) *MockWithOption_isOption_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run()
-	})
-	return _c
-}
-
-func (_c *MockWithOption_isOption_Call) Return(_a0 bool) *MockWithOption_isOption_Call {
-	_c.Call.Return(_a0)
-	return _c
-}
-
-func (_c *MockWithOption_isOption_Call) RunAndReturn(run func() bool) *MockWithOption_isOption_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-type mockConstructorTestingTNewMockWithOption interface {
-	mock.TestingT
-	Cleanup(func())
-}
-
-// NewMockWithOption creates a new instance of MockWithOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockWithOption(t mockConstructorTestingTNewMockWithOption) *MockWithOption {
-	mock := &MockWithOption{}
-	mock.Mock.Test(t)
-
-	t.Cleanup(func() { mock.AssertExpectations(t) })
-
-	return mock
-}
diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/ExtendedReader.go
similarity index 54%
copy from plc4go/spi/transports/TransportInstance.go
copy to plc4go/spi/transports/ExtendedReader.go
index e81d6821b9..bb9fe3e847 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/ExtendedReader.go
@@ -19,26 +19,13 @@
 
 package transports
 
-import (
-	"bufio"
-	"context"
-	"fmt"
-)
+import "io"
 
-type TransportInstance interface {
-	fmt.Stringer
-	Connect() error
-	ConnectWithContext(ctx context.Context) error
-	Close() error
-
-	IsConnected() bool
-
-	// FillBuffer fills the buffer `until` false (Useful in conjunction if you want GetNumBytesAvailableInBuffer)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
-	// GetNumBytesAvailableInBuffer returns the bytes currently available in buffer (!!!Careful: if you looking for a termination you have to use FillBuffer)
-	GetNumBytesAvailableInBuffer() (uint32, error)
-	PeekReadableBytes(numBytes uint32) ([]byte, error)
-	Read(numBytes uint32) ([]byte, error)
-
-	Write(data []byte) error
+type ExtendedReader interface {
+	io.Reader
+	io.ByteReader
+	// Peek returns the next n bytes without advancing the reader.
+	Peek(int) ([]byte, error)
+	// Buffered returns the number of bytes that can be read from the current buffer.
+	Buffered() int
 }
diff --git a/plc4go/spi/transports/TransportInstance.go b/plc4go/spi/transports/TransportInstance.go
index e81d6821b9..ee121c3a87 100644
--- a/plc4go/spi/transports/TransportInstance.go
+++ b/plc4go/spi/transports/TransportInstance.go
@@ -20,7 +20,6 @@
 package transports
 
 import (
-	"bufio"
 	"context"
 	"fmt"
 )
@@ -34,7 +33,7 @@ type TransportInstance interface {
 	IsConnected() bool
 
 	// FillBuffer fills the buffer `until` false (Useful in conjunction if you want GetNumBytesAvailableInBuffer)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
+	FillBuffer(until func(pos uint, currentByte byte, reader ExtendedReader) bool) error
 	// GetNumBytesAvailableInBuffer returns the bytes currently available in buffer (!!!Careful: if you looking for a termination you have to use FillBuffer)
 	GetNumBytesAvailableInBuffer() (uint32, error)
 	PeekReadableBytes(numBytes uint32) ([]byte, error)
diff --git a/plc4go/spi/transports/mock_ExtendedReader_test.go b/plc4go/spi/transports/mock_ExtendedReader_test.go
new file mode 100644
index 0000000000..9b9e291e77
--- /dev/null
+++ b/plc4go/spi/transports/mock_ExtendedReader_test.go
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.30.1. DO NOT EDIT.
+
+package transports
+
+import mock "github.com/stretchr/testify/mock"
+
+// MockExtendedReader is an autogenerated mock type for the ExtendedReader type
+type MockExtendedReader struct {
+	mock.Mock
+}
+
+type MockExtendedReader_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockExtendedReader) EXPECT() *MockExtendedReader_Expecter {
+	return &MockExtendedReader_Expecter{mock: &_m.Mock}
+}
+
+// Buffered provides a mock function with given fields:
+func (_m *MockExtendedReader) Buffered() int {
+	ret := _m.Called()
+
+	var r0 int
+	if rf, ok := ret.Get(0).(func() int); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(int)
+	}
+
+	return r0
+}
+
+// MockExtendedReader_Buffered_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Buffered'
+type MockExtendedReader_Buffered_Call struct {
+	*mock.Call
+}
+
+// Buffered is a helper method to define mock.On call
+func (_e *MockExtendedReader_Expecter) Buffered() *MockExtendedReader_Buffered_Call {
+	return &MockExtendedReader_Buffered_Call{Call: _e.mock.On("Buffered")}
+}
+
+func (_c *MockExtendedReader_Buffered_Call) Run(run func()) *MockExtendedReader_Buffered_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Buffered_Call) Return(_a0 int) *MockExtendedReader_Buffered_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExtendedReader_Buffered_Call) RunAndReturn(run func() int) *MockExtendedReader_Buffered_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Peek provides a mock function with given fields: _a0
+func (_m *MockExtendedReader) Peek(_a0 int) ([]byte, error) {
+	ret := _m.Called(_a0)
+
+	var r0 []byte
+	var r1 error
+	if rf, ok := ret.Get(0).(func(int) ([]byte, error)); ok {
+		return rf(_a0)
+	}
+	if rf, ok := ret.Get(0).(func(int) []byte); ok {
+		r0 = rf(_a0)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]byte)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(int) error); ok {
+		r1 = rf(_a0)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_Peek_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Peek'
+type MockExtendedReader_Peek_Call struct {
+	*mock.Call
+}
+
+// Peek is a helper method to define mock.On call
+//   - _a0 int
+func (_e *MockExtendedReader_Expecter) Peek(_a0 interface{}) *MockExtendedReader_Peek_Call {
+	return &MockExtendedReader_Peek_Call{Call: _e.mock.On("Peek", _a0)}
+}
+
+func (_c *MockExtendedReader_Peek_Call) Run(run func(_a0 int)) *MockExtendedReader_Peek_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(int))
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Peek_Call) Return(_a0 []byte, _a1 error) *MockExtendedReader_Peek_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockExtendedReader_Peek_Call) RunAndReturn(run func(int) ([]byte, error)) *MockExtendedReader_Peek_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Read provides a mock function with given fields: p
+func (_m *MockExtendedReader) Read(p []byte) (int, error) {
+	ret := _m.Called(p)
+
+	var r0 int
+	var r1 error
+	if rf, ok := ret.Get(0).(func([]byte) (int, error)); ok {
+		return rf(p)
+	}
+	if rf, ok := ret.Get(0).(func([]byte) int); ok {
+		r0 = rf(p)
+	} else {
+		r0 = ret.Get(0).(int)
+	}
+
+	if rf, ok := ret.Get(1).(func([]byte) error); ok {
+		r1 = rf(p)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_Read_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Read'
+type MockExtendedReader_Read_Call struct {
+	*mock.Call
+}
+
+// Read is a helper method to define mock.On call
+//   - p []byte
+func (_e *MockExtendedReader_Expecter) Read(p interface{}) *MockExtendedReader_Read_Call {
+	return &MockExtendedReader_Read_Call{Call: _e.mock.On("Read", p)}
+}
+
+func (_c *MockExtendedReader_Read_Call) Run(run func(p []byte)) *MockExtendedReader_Read_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].([]byte))
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_Read_Call) Return(n int, err error) *MockExtendedReader_Read_Call {
+	_c.Call.Return(n, err)
+	return _c
+}
+
+func (_c *MockExtendedReader_Read_Call) RunAndReturn(run func([]byte) (int, error)) *MockExtendedReader_Read_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// ReadByte provides a mock function with given fields:
+func (_m *MockExtendedReader) ReadByte() (byte, error) {
+	ret := _m.Called()
+
+	var r0 byte
+	var r1 error
+	if rf, ok := ret.Get(0).(func() (byte, error)); ok {
+		return rf()
+	}
+	if rf, ok := ret.Get(0).(func() byte); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(byte)
+	}
+
+	if rf, ok := ret.Get(1).(func() error); ok {
+		r1 = rf()
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockExtendedReader_ReadByte_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadByte'
+type MockExtendedReader_ReadByte_Call struct {
+	*mock.Call
+}
+
+// ReadByte is a helper method to define mock.On call
+func (_e *MockExtendedReader_Expecter) ReadByte() *MockExtendedReader_ReadByte_Call {
+	return &MockExtendedReader_ReadByte_Call{Call: _e.mock.On("ReadByte")}
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) Run(run func()) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) Return(_a0 byte, _a1 error) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockExtendedReader_ReadByte_Call) RunAndReturn(run func() (byte, error)) *MockExtendedReader_ReadByte_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// NewMockExtendedReader creates a new instance of MockExtendedReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockExtendedReader(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *MockExtendedReader {
+	mock := &MockExtendedReader{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/plc4go/spi/transports/mock_TransportInstance_test.go b/plc4go/spi/transports/mock_TransportInstance_test.go
index 2055e06b88..d85643393f 100644
--- a/plc4go/spi/transports/mock_TransportInstance_test.go
+++ b/plc4go/spi/transports/mock_TransportInstance_test.go
@@ -22,7 +22,6 @@
 package transports
 
 import (
-	bufio "bufio"
 	context "context"
 
 	mock "github.com/stretchr/testify/mock"
@@ -166,11 +165,11 @@ func (_c *MockTransportInstance_ConnectWithContext_Call) RunAndReturn(run func(c
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockTransportInstance) FillBuffer(until func(uint, byte, ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -185,14 +184,14 @@ type MockTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , ExtendedReader) bool
 func (_e *MockTransportInstance_Expecter) FillBuffer(until interface{}) *MockTransportInstance_FillBuffer_Call {
 	return &MockTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, ExtendedReader) bool)) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, ExtendedReader) bool))
 	})
 	return _c
 }
@@ -202,7 +201,7 @@ func (_c *MockTransportInstance_FillBuffer_Call) Return(_a0 error) *MockTranspor
 	return _c
 }
 
-func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockTransportInstance_FillBuffer_Call {
+func (_c *MockTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, ExtendedReader) bool) error) *MockTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go b/plc4go/spi/transports/pcap/TransportInstance.go
index 30361c358e..20a0c7e9df 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -30,6 +30,7 @@ import (
 	"time"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/gopacket/gopacket"
@@ -169,7 +170,7 @@ func (m *TransportInstance) Write(_ []byte) error {
 	return errors.New("Write to pcap not supported")
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/serial/TransportInstance.go b/plc4go/spi/transports/serial/TransportInstance.go
index 514fc4ef01..33afeaf212 100644
--- a/plc4go/spi/transports/serial/TransportInstance.go
+++ b/plc4go/spi/transports/serial/TransportInstance.go
@@ -22,12 +22,15 @@ package serial
 import (
 	"bufio"
 	"fmt"
+	"io"
+
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
+
 	"github.com/jacobsa/go-serial/serial"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
-	"io"
 )
 
 type TransportInstance struct {
@@ -106,7 +109,7 @@ func (m *TransportInstance) Write(data []byte) error {
 	return nil
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go
index 2c060c076e..6fdf65b2c7 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -26,6 +26,7 @@ import (
 	"net"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/pkg/errors"
@@ -34,12 +35,14 @@ import (
 
 type TransportInstance struct {
 	transportUtils.DefaultBufferedTransportInstance
+
 	RemoteAddress  *net.TCPAddr
 	LocalAddress   *net.TCPAddr
 	ConnectTimeout uint32
-	transport      *Transport
-	tcpConn        net.Conn
-	reader         *bufio.Reader
+
+	transport *Transport
+	tcpConn   net.Conn
+	reader    *bufio.Reader
 
 	log zerolog.Logger
 }
@@ -108,7 +111,7 @@ func (m *TransportInstance) Write(data []byte) error {
 	return nil
 }
 
-func (m *TransportInstance) GetReader() *bufio.Reader {
+func (m *TransportInstance) GetReader() transports.ExtendedReader {
 	return m.reader
 }
 
diff --git a/plc4go/spi/transports/tcp/TransportInstance_test.go b/plc4go/spi/transports/tcp/TransportInstance_test.go
index 7c929b69f4..64149d8220 100644
--- a/plc4go/spi/transports/tcp/TransportInstance_test.go
+++ b/plc4go/spi/transports/tcp/TransportInstance_test.go
@@ -20,11 +20,11 @@
 package tcp
 
 import (
-	"bufio"
 	"context"
 	"net"
 	"testing"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	transportUtils "github.com/apache/plc4x/plc4go/spi/transports/utils"
 
 	"github.com/stretchr/testify/assert"
@@ -69,43 +69,44 @@ func TestTransportInstance_Close(t *testing.T) {
 		ConnectTimeout                   uint32
 		transport                        *Transport
 		tcpConn                          net.Conn
-		reader                           *bufio.Reader
+		reader                           transports.ExtendedReader
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		wantErr bool
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, ti *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name: "close it (no conn)",
 		},
 		{
 			name: "close it (broken connection)",
-			fields: fields{
-				tcpConn: &net.TCPConn{},
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				var tcpConn net.Conn = &net.TCPConn{}
+				ti.tcpConn.Store(&tcpConn)
 			},
 			wantErr: true,
 		},
 		{
 			name: "close it",
-			fields: fields{
-				tcpConn: func() *net.TCPConn {
-					listener, err := nettest.NewLocalListener("tcp")
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, listener.Close())
-					})
-					go func() {
-						_, _ = listener.Accept()
-					}()
-					tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						// As we already closed the connection with the whole method this should error
-						assert.Error(t, tcp.Close())
-					})
-					return tcp
-				}(),
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				listener, err := nettest.NewLocalListener("tcp")
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, listener.Close())
+				})
+				go func() {
+					_, _ = listener.Accept()
+				}()
+				tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					// As we already closed the connection with the whole method this should error
+					assert.Error(t, tcp.Close())
+				})
+				var tcpConn net.Conn = tcp
+				ti.tcpConn.Store(&tcpConn)
 			},
 		},
 	}
@@ -117,8 +118,9 @@ func TestTransportInstance_Close(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
 			}
 			if err := m.Close(); (err != nil) != tt.wantErr {
 				t.Errorf("Close() error = %v, wantErr %v", err, tt.wantErr)
@@ -134,8 +136,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name    string
@@ -155,8 +155,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if err := m.Connect(); (err != nil) != tt.wantErr {
 				t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr)
@@ -172,8 +170,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	type args struct {
 		ctx context.Context
@@ -218,8 +214,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if err := m.ConnectWithContext(tt.args.ctx); (err != nil) != tt.wantErr {
 				t.Errorf("ConnectWithContext() error = %v, wantErr %v", err, tt.wantErr)
@@ -235,13 +229,11 @@ func TestTransportInstance_GetReader(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
 		fields fields
-		want   *bufio.Reader
+		want   transports.ExtendedReader
 	}{
 		{
 			name: "get it",
@@ -255,8 +247,6 @@ func TestTransportInstance_GetReader(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.GetReader(); !assert.Equal(t, tt.want, got) {
 				t.Errorf("GetReader() = %v, want %v", got, tt.want)
@@ -272,8 +262,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
@@ -292,8 +280,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.IsConnected(); got != tt.want {
 				t.Errorf("IsConnected() = %v, want %v", got, tt.want)
@@ -309,8 +295,6 @@ func TestTransportInstance_String(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	tests := []struct {
 		name   string
@@ -338,8 +322,6 @@ func TestTransportInstance_String(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
 			}
 			if got := m.String(); got != tt.want {
 				t.Errorf("String() = %v, want %v", got, tt.want)
@@ -355,17 +337,16 @@ func TestTransportInstance_Write(t *testing.T) {
 		LocalAddress                     *net.TCPAddr
 		ConnectTimeout                   uint32
 		transport                        *Transport
-		tcpConn                          net.Conn
-		reader                           *bufio.Reader
 	}
 	type args struct {
 		data []byte
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, ti *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name:    "write it (failing)",
@@ -373,30 +354,30 @@ func TestTransportInstance_Write(t *testing.T) {
 		},
 		{
 			name: "write it (failing with con)",
-			fields: fields{
-				tcpConn: &net.TCPConn{},
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				var tcpConn net.Conn = &net.TCPConn{}
+				ti.tcpConn.Store(&tcpConn)
 			},
 			wantErr: true,
 		},
 		{
 			name: "write it",
-			fields: fields{
-				tcpConn: func() *net.TCPConn {
-					listener, err := nettest.NewLocalListener("tcp")
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, listener.Close())
-					})
-					go func() {
-						_, _ = listener.Accept()
-					}()
-					tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, tcp.Close())
-					})
-					return tcp
-				}(),
+			manipulator: func(t *testing.T, ti *TransportInstance) {
+				listener, err := nettest.NewLocalListener("tcp")
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, listener.Close())
+				})
+				go func() {
+					_, _ = listener.Accept()
+				}()
+				tcp, err := net.DialTCP("tcp", nil, listener.Addr().(*net.TCPAddr))
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, tcp.Close())
+				})
+				var tcpConn net.Conn = tcp
+				ti.tcpConn.Store(&tcpConn)
 			},
 		},
 	}
@@ -408,8 +389,9 @@ func TestTransportInstance_Write(t *testing.T) {
 				LocalAddress:                     tt.fields.LocalAddress,
 				ConnectTimeout:                   tt.fields.ConnectTimeout,
 				transport:                        tt.fields.transport,
-				tcpConn:                          tt.fields.tcpConn,
-				reader:                           tt.fields.reader,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
 			}
 			if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
 				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go
index 972a5f4b8c..b01d94ef3b 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -28,6 +28,7 @@ import (
 	"sync"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
@@ -98,7 +99,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 	return uint32(readableBytes), nil
 }
 
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	if !m.IsConnected() {
 		panic(errors.New("working on a unconnected connection"))
 	}
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go
index 36fdb95e97..78cbf318b3 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -20,10 +20,11 @@
 package test
 
 import (
-	"bufio"
 	"context"
 	"testing"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
+
 	"github.com/stretchr/testify/assert"
 )
 
@@ -205,7 +206,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name    string
@@ -219,7 +220,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				connected: true,
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 3
 				},
 			},
@@ -232,7 +233,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 3
 				},
 			},
diff --git a/plc4go/spi/transports/udp/TransportInstance.go b/plc4go/spi/transports/udp/TransportInstance.go
index 7be389f70f..c99a4a2780 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -26,6 +26,7 @@ import (
 	"net"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/libp2p/go-reuseport"
 	"github.com/pkg/errors"
@@ -133,7 +134,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 	return uint32(m.reader.Buffered()), nil
 }
 
-func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	nBytes := uint32(1)
 	for {
 		_bytes, err := m.PeekReadableBytes(nBytes)
diff --git a/plc4go/spi/transports/udp/TransportInstance_test.go b/plc4go/spi/transports/udp/TransportInstance_test.go
index c973797222..7424fa472b 100644
--- a/plc4go/spi/transports/udp/TransportInstance_test.go
+++ b/plc4go/spi/transports/udp/TransportInstance_test.go
@@ -23,6 +23,7 @@ import (
 	"bufio"
 	"bytes"
 	"context"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 	"net"
 	"testing"
 
@@ -299,7 +300,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		reader         *bufio.Reader
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name    string
@@ -317,7 +318,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
 			},
 			args: args{
-				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 2
 				},
 			},
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index ac94e35625..6406964125 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -20,25 +20,25 @@
 package utils
 
 import (
-	"bufio"
 	"context"
 	"runtime/debug"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/transports"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 )
 
 type DefaultBufferedTransportInstanceRequirements interface {
-	GetReader() *bufio.Reader
+	GetReader() transports.ExtendedReader
 	Connect() error
 }
 
 type DefaultBufferedTransportInstance interface {
 	ConnectWithContext(ctx context.Context) error
 	GetNumBytesAvailableInBuffer() (uint32, error)
-	FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error
+	FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error
 	PeekReadableBytes(numBytes uint32) ([]byte, error)
 	Read(numBytes uint32) ([]byte, error)
 }
@@ -84,7 +84,7 @@ func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint3
 	return uint32(m.GetReader().Buffered()), nil
 }
 
-func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	if m.GetReader() == nil {
 		return nil
 	}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
index b9cfe9842b..9d13f28f32 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
@@ -26,6 +26,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/plc4x/plc4go/spi/transports"
+
 	"github.com/stretchr/testify/assert"
 )
 
@@ -118,7 +120,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		DefaultBufferedTransportInstanceRequirements DefaultBufferedTransportInstanceRequirements
 	}
 	type args struct {
-		until func(pos uint, currentByte byte, reader *bufio.Reader) bool
+		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
 		name      string
@@ -138,7 +140,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		},
 		{
 			name: "fill it with reader",
-			args: args{func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+			args: args{func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 				return pos < 1
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
@@ -149,7 +151,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 		},
 		{
 			name: "fill it with reader errors",
-			args: args{func(pos uint, currentByte byte, reader *bufio.Reader) bool {
+			args: args{func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 				return pos < 2
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
index 0fe10042e2..c5bb927ebf 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
@@ -22,8 +22,7 @@
 package utils
 
 import (
-	bufio "bufio"
-
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -82,15 +81,15 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_Connect_Call) RunAndR
 }
 
 // GetReader provides a mock function with given fields:
-func (_m *MockDefaultBufferedTransportInstanceRequirements) GetReader() *bufio.Reader {
+func (_m *MockDefaultBufferedTransportInstanceRequirements) GetReader() transports.ExtendedReader {
 	ret := _m.Called()
 
-	var r0 *bufio.Reader
-	if rf, ok := ret.Get(0).(func() *bufio.Reader); ok {
+	var r0 transports.ExtendedReader
+	if rf, ok := ret.Get(0).(func() transports.ExtendedReader); ok {
 		r0 = rf()
 	} else {
 		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(*bufio.Reader)
+			r0 = ret.Get(0).(transports.ExtendedReader)
 		}
 	}
 
@@ -114,12 +113,12 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Run(r
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Return(_a0 *bufio.Reader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
+func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) Return(_a0 transports.ExtendedReader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
 	_c.Call.Return(_a0)
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAndReturn(run func() *bufio.Reader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
+func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAndReturn(run func() transports.ExtendedReader) *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call {
 	_c.Call.Return(run)
 	return _c
 }
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
index a89041df2a..1a30cadcb0 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstance_test.go
@@ -22,9 +22,9 @@
 package utils
 
 import (
-	bufio "bufio"
 	context "context"
 
+	transports "github.com/apache/plc4x/plc4go/spi/transports"
 	mock "github.com/stretchr/testify/mock"
 )
 
@@ -84,11 +84,11 @@ func (_c *MockDefaultBufferedTransportInstance_ConnectWithContext_Call) RunAndRe
 }
 
 // FillBuffer provides a mock function with given fields: until
-func (_m *MockDefaultBufferedTransportInstance) FillBuffer(until func(uint, byte, *bufio.Reader) bool) error {
+func (_m *MockDefaultBufferedTransportInstance) FillBuffer(until func(uint, byte, transports.ExtendedReader) bool) error {
 	ret := _m.Called(until)
 
 	var r0 error
-	if rf, ok := ret.Get(0).(func(func(uint, byte, *bufio.Reader) bool) error); ok {
+	if rf, ok := ret.Get(0).(func(func(uint, byte, transports.ExtendedReader) bool) error); ok {
 		r0 = rf(until)
 	} else {
 		r0 = ret.Error(0)
@@ -103,14 +103,14 @@ type MockDefaultBufferedTransportInstance_FillBuffer_Call struct {
 }
 
 // FillBuffer is a helper method to define mock.On call
-//   - until func(uint , byte , *bufio.Reader) bool
+//   - until func(uint , byte , transports.ExtendedReader) bool
 func (_e *MockDefaultBufferedTransportInstance_Expecter) FillBuffer(until interface{}) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	return &MockDefaultBufferedTransportInstance_FillBuffer_Call{Call: _e.mock.On("FillBuffer", until)}
 }
 
-func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, *bufio.Reader) bool)) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
+func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Run(run func(until func(uint, byte, transports.ExtendedReader) bool)) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(func(uint, byte, *bufio.Reader) bool))
+		run(args[0].(func(uint, byte, transports.ExtendedReader) bool))
 	})
 	return _c
 }
@@ -120,7 +120,7 @@ func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) Return(_a0 error
 	return _c
 }
 
-func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, *bufio.Reader) bool) error) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
+func (_c *MockDefaultBufferedTransportInstance_FillBuffer_Call) RunAndReturn(run func(func(uint, byte, transports.ExtendedReader) bool) error) *MockDefaultBufferedTransportInstance_FillBuffer_Call {
 	_c.Call.Return(run)
 	return _c
 }