You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/20 07:24:21 UTC

[plc4x] branch develop updated: fix(plc4go/cbus): ensure TransportInstances are properly synced

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 73317f81ae fix(plc4go/cbus): ensure TransportInstances are properly synced
73317f81ae is described below

commit 73317f81ae3a410753775d713577378868b03e8c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 20 09:24:14 2023 +0200

    fix(plc4go/cbus): ensure TransportInstances are properly synced
---
 plc4go/spi/transports/pcap/TransportInstance.go    |   3 +
 plc4go/spi/transports/serial/TransportInstance.go  |  27 +++-
 plc4go/spi/transports/tcp/TransportInstance.go     |   2 +-
 plc4go/spi/transports/test/TransportInstance.go    |  36 +++--
 .../spi/transports/test/TransportInstance_test.go  | 170 +++++++++++----------
 plc4go/spi/transports/udp/TransportInstance.go     |  45 ++++--
 .../spi/transports/udp/TransportInstance_test.go   | 157 ++++++++++++-------
 .../utils/DefaultBufferedTransportInstance.go      |  13 ++
 .../utils/DefaultBufferedTransportInstance_test.go |  37 +++--
 ...ltBufferedTransportInstanceRequirements_test.go |  41 +++++
 10 files changed, 360 insertions(+), 171 deletions(-)

diff --git a/plc4go/spi/transports/pcap/TransportInstance.go b/plc4go/spi/transports/pcap/TransportInstance.go
index 20a0c7e9df..3f396d9e93 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -167,6 +167,9 @@ func (m *TransportInstance) IsConnected() bool {
 }
 
 func (m *TransportInstance) Write(_ []byte) error {
+	if !m.connected.Load() {
+		return errors.New("error writing to transport. No writer available")
+	}
 	return errors.New("Write to pcap not supported")
 }
 
diff --git a/plc4go/spi/transports/serial/TransportInstance.go b/plc4go/spi/transports/serial/TransportInstance.go
index 33afeaf212..afa5e90322 100644
--- a/plc4go/spi/transports/serial/TransportInstance.go
+++ b/plc4go/spi/transports/serial/TransportInstance.go
@@ -23,6 +23,8 @@ import (
 	"bufio"
 	"fmt"
 	"io"
+	"sync"
+	"sync/atomic"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
@@ -35,12 +37,17 @@ import (
 
 type TransportInstance struct {
 	transportUtils.DefaultBufferedTransportInstance
+
 	SerialPortName string
 	BaudRate       uint
 	ConnectTimeout uint32
-	transport      *Transport
-	serialPort     io.ReadWriteCloser
-	reader         *bufio.Reader
+
+	connected        atomic.Bool
+	stateChangeMutex sync.Mutex
+
+	transport  *Transport
+	serialPort io.ReadWriteCloser
+	reader     *bufio.Reader
 
 	log zerolog.Logger
 }
@@ -59,6 +66,12 @@ func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout u
 }
 
 func (m *TransportInstance) Connect() error {
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
+	if m.connected.Load() {
+		return errors.New("Already connected")
+	}
+
 	var err error
 	config := serial.OpenOptions{PortName: m.SerialPortName, BaudRate: m.BaudRate, DataBits: 8, StopBits: 1, MinimumReadSize: 0, InterCharacterTimeout: 100 /*, RTSCTSFlowControl: true*/}
 	m.serialPort, err = serial.Open(config)
@@ -80,6 +93,9 @@ func (m *TransportInstance) Connect() error {
 }
 
 func (m *TransportInstance) Close() error {
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
+
 	if m.serialPort == nil {
 		return nil
 	}
@@ -88,6 +104,8 @@ func (m *TransportInstance) Close() error {
 		return errors.Wrap(err, "error closing serial port")
 	}
 	m.serialPort = nil
+
+	m.connected.Store(false)
 	return nil
 }
 
@@ -96,6 +114,9 @@ func (m *TransportInstance) IsConnected() bool {
 }
 
 func (m *TransportInstance) Write(data []byte) error {
+	if !m.connected.Load() {
+		return errors.New("error writing to transport. Not connected")
+	}
 	if m.serialPort == nil {
 		return errors.New("error writing to transport. No writer available")
 	}
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go
index 3e5e7ff705..df2ffc8be8 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -112,7 +112,7 @@ func (m *TransportInstance) IsConnected() bool {
 
 func (m *TransportInstance) Write(data []byte) error {
 	if !m.connected.Load() {
-		return errors.New("error writing to transport. No writer available")
+		return errors.New("error writing to transport. Not connected")
 	}
 	num, err := m.tcpConn.Write(data)
 	if err != nil {
diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go
index b01d94ef3b..936f4dae40 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -26,6 +26,7 @@ import (
 	"encoding/hex"
 	"math"
 	"sync"
+	"sync/atomic"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
@@ -37,11 +38,11 @@ import (
 type TransportInstance struct {
 	readBuffer       []byte
 	writeBuffer      []byte
-	connected        bool
 	transport        *Transport
 	writeInterceptor func(transportInstance *TransportInstance, data []byte)
 
 	dataMutex        sync.RWMutex
+	connected        atomic.Bool
 	stateChangeMutex sync.RWMutex
 
 	log zerolog.Logger
@@ -51,7 +52,6 @@ func NewTransportInstance(transport *Transport, _options ...options.WithOption)
 	return &TransportInstance{
 		readBuffer:  []byte{},
 		writeBuffer: []byte{},
-		connected:   false,
 		transport:   transport,
 
 		log: options.ExtractCustomLogger(_options...),
@@ -61,12 +61,12 @@ func NewTransportInstance(transport *Transport, _options ...options.WithOption)
 func (m *TransportInstance) Connect() error {
 	m.stateChangeMutex.Lock()
 	defer m.stateChangeMutex.Unlock()
-	if m.connected {
+	if m.connected.Load() {
 		m.log.Warn().Msg("already connected")
 		return nil
 	}
 	m.log.Trace().Msg("Connect")
-	m.connected = true
+	m.connected.Store(true)
 	return nil
 }
 
@@ -77,15 +77,16 @@ func (m *TransportInstance) ConnectWithContext(_ context.Context) error {
 func (m *TransportInstance) Close() error {
 	m.stateChangeMutex.Lock()
 	defer m.stateChangeMutex.Unlock()
+	if !m.connected.Load() {
+		return nil
+	}
 	m.log.Trace().Msg("Close")
-	m.connected = false
+	m.connected.Store(true)
 	return nil
 }
 
 func (m *TransportInstance) IsConnected() bool {
-	m.stateChangeMutex.RLock()
-	defer m.stateChangeMutex.RUnlock()
-	return m.connected
+	return m.connected.Load()
 }
 
 func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
@@ -101,7 +102,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 
 func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		return errors.New("working on a unconnected connection")
 	}
 	m.log.Trace().Msg("Fill the buffer")
 	nBytes := uint32(1)
@@ -125,7 +126,7 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
 
 func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		return nil, errors.New("working on a unconnected connection")
 	}
 	m.dataMutex.RLock()
 	defer m.dataMutex.RUnlock()
@@ -144,7 +145,7 @@ func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
 
 func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		return nil, errors.New("working on a unconnected connection")
 	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
@@ -165,7 +166,7 @@ func (m *TransportInstance) SetWriteInterceptor(writeInterceptor func(transportI
 
 func (m *TransportInstance) Write(data []byte) error {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		return errors.New("working on a unconnected connection")
 	}
 	if m.writeInterceptor != nil {
 		m.log.Trace().Msgf("Passing data to write interceptor\n%s", hex.Dump(data))
@@ -180,7 +181,8 @@ func (m *TransportInstance) Write(data []byte) error {
 
 func (m *TransportInstance) FillReadBuffer(data []byte) {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		m.log.Error().Msg("working on a unconnected connection")
+		return
 	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
@@ -190,7 +192,8 @@ func (m *TransportInstance) FillReadBuffer(data []byte) {
 
 func (m *TransportInstance) GetNumDrainableBytes() uint32 {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		m.log.Error().Msg("working on a unconnected connection")
+		return 0
 	}
 	m.dataMutex.RLock()
 	defer m.dataMutex.RUnlock()
@@ -200,7 +203,8 @@ func (m *TransportInstance) GetNumDrainableBytes() uint32 {
 
 func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
 	if !m.IsConnected() {
-		panic(errors.New("working on a unconnected connection"))
+		m.log.Error().Msg("working on a unconnected connection")
+		return nil
 	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
@@ -211,5 +215,5 @@ func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
 }
 
 func (m *TransportInstance) String() string {
-	return "test" //TODO: maybe use plc4xgen
+	return "test"
 }
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go
index 78cbf318b3..fd903d2f28 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -58,7 +58,6 @@ func TestTransportInstance_Close(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -76,7 +75,6 @@ func TestTransportInstance_Close(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -91,7 +89,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -109,7 +106,6 @@ func TestTransportInstance_Connect(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -124,7 +120,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -146,7 +141,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -161,7 +155,6 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -169,15 +162,16 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
 		numBytes uint32
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		args   args
-		want   []byte
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        []byte
 	}{
 		{
 			name: "drain it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 	}
@@ -186,10 +180,12 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if got := m.DrainWriteBuffer(tt.args.numBytes); !assert.Equal(t, tt.want, got) {
 				t.Errorf("DrainWriteBuffer() = %v, want %v", got, tt.want)
 			}
@@ -201,7 +197,6 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -209,27 +204,27 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name: "fill it (errors)",
-			fields: fields{
-				connected: true,
-			},
 			args: args{
 				until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
 					return pos < 3
 				},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			wantErr: true,
 		},
 		{
 			name: "fill it",
 			fields: fields{
-				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
@@ -237,6 +232,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 					return pos < 3
 				},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -244,10 +242,12 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if err := m.FillBuffer(tt.args.until); (err != nil) != tt.wantErr {
 				t.Errorf("FillBuffer() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -259,7 +259,6 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -267,19 +266,22 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
 		data []byte
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		args   args
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
 	}{
 		{
 			name: "fill it",
 			fields: fields{
-				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
 				data: []byte{1, 2, 3, 4},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -287,10 +289,12 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			m.FillReadBuffer(tt.args.data)
 		})
 	}
@@ -300,28 +304,30 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		want    uint32
-		wantErr bool
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        uint32
+		wantErr     bool
 	}{
 		{
 			name: "get it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 		{
 			name: "get it too",
 			fields: fields{
-				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: 4,
 		},
 	}
@@ -330,10 +336,12 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.GetNumBytesAvailableInBuffer()
 			if (err != nil) != tt.wantErr {
 				t.Errorf("GetNumBytesAvailableInBuffer() error = %v, wantErr %v", err, tt.wantErr)
@@ -350,19 +358,19 @@ func TestTransportInstance_GetNumDrainableBytes(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		want   uint32
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        uint32
 	}{
 		{
 			name: "get it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 	}
@@ -371,10 +379,12 @@ func TestTransportInstance_GetNumDrainableBytes(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if got := m.GetNumDrainableBytes(); got != tt.want {
 				t.Errorf("GetNumDrainableBytes() = %v, want %v", got, tt.want)
 			}
@@ -386,7 +396,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -404,7 +413,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -419,7 +427,6 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -427,16 +434,17 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 		numBytes uint32
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		want    []byte
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        []byte
+		wantErr     bool
 	}{
 		{
 			name: "peek it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 	}
@@ -445,10 +453,12 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.PeekReadableBytes(tt.args.numBytes)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("PeekReadableBytes() error = %v, wantErr %v", err, tt.wantErr)
@@ -465,7 +475,6 @@ func TestTransportInstance_Read(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -473,16 +482,17 @@ func TestTransportInstance_Read(t *testing.T) {
 		numBytes uint32
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		want    []byte
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        []byte
+		wantErr     bool
 	}{
 		{
 			name: "read it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 			wantErr: true,
 		},
@@ -492,10 +502,12 @@ func TestTransportInstance_Read(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.Read(tt.args.numBytes)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr)
@@ -512,7 +524,6 @@ func TestTransportInstance_SetWriteInterceptor(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -533,7 +544,6 @@ func TestTransportInstance_SetWriteInterceptor(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -546,7 +556,6 @@ func TestTransportInstance_String(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -565,7 +574,6 @@ func TestTransportInstance_String(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
@@ -580,7 +588,6 @@ func TestTransportInstance_Write(t *testing.T) {
 	type fields struct {
 		readBuffer       []byte
 		writeBuffer      []byte
-		connected        bool
 		transport        *Transport
 		writeInterceptor func(transportInstance *TransportInstance, data []byte)
 	}
@@ -588,21 +595,21 @@ func TestTransportInstance_Write(t *testing.T) {
 		data []byte
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name: "write it",
-			fields: fields{
-				connected: true,
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 		{
 			name: "write it",
 			fields: fields{
-				connected: true,
 				writeInterceptor: func(transportInstance *TransportInstance, data []byte) {
 					assert.NotNil(t, transportInstance)
 					assert.Equal(t, []byte{1, 2, 3, 4}, data)
@@ -611,6 +618,9 @@ func TestTransportInstance_Write(t *testing.T) {
 			args: args{
 				data: []byte{1, 2, 3, 4},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -618,10 +628,12 @@ func TestTransportInstance_Write(t *testing.T) {
 			m := &TransportInstance{
 				readBuffer:       tt.fields.readBuffer,
 				writeBuffer:      tt.fields.writeBuffer,
-				connected:        tt.fields.connected,
 				transport:        tt.fields.transport,
 				writeInterceptor: tt.fields.writeInterceptor,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
 				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
 			}
diff --git a/plc4go/spi/transports/udp/TransportInstance.go b/plc4go/spi/transports/udp/TransportInstance.go
index c99a4a2780..383123ada8 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -24,6 +24,8 @@ import (
 	"context"
 	"fmt"
 	"net"
+	"sync"
+	"sync/atomic"
 
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
@@ -38,9 +40,13 @@ type TransportInstance struct {
 	RemoteAddress  *net.UDPAddr
 	ConnectTimeout uint32
 	SoReUse        bool
-	transport      *Transport
-	udpConn        *net.UDPConn
-	reader         *bufio.Reader
+
+	transport *Transport
+	udpConn   *net.UDPConn
+	reader    *bufio.Reader
+
+	connected        atomic.Bool
+	stateChangeMutex sync.RWMutex
 
 	log zerolog.Logger
 }
@@ -62,6 +68,11 @@ func (m *TransportInstance) Connect() error {
 }
 
 func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
+	if m.connected.Load() {
+		return errors.New("already connected")
+	}
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
 	// If we haven't provided a local address, have the system figure it out by dialing
 	// the remote address and then using that connections local address as local address.
 	if m.LocalAddress == nil && m.RemoteAddress != nil {
@@ -107,26 +118,33 @@ func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
 	}()*/
 	m.reader = bufio.NewReader(m.udpConn)
 
+	m.connected.Store(true)
+
 	return nil
 }
 
 func (m *TransportInstance) Close() error {
-	if m.udpConn == nil {
+	m.stateChangeMutex.Lock()
+	defer m.stateChangeMutex.Unlock()
+	if !m.connected.Load() {
 		return nil
 	}
 	err := m.udpConn.Close()
 	if err != nil {
 		return errors.Wrap(err, "error closing connection")
 	}
-	m.udpConn = nil
+	m.connected.Store(false)
 	return nil
 }
 
 func (m *TransportInstance) IsConnected() bool {
-	return m.udpConn != nil
+	return m.connected.Load()
 }
 
 func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+	if !m.IsConnected() {
+		return 0, errors.New("working on a unconnected connection")
+	}
 	if m.reader == nil {
 		return 0, nil
 	}
@@ -135,6 +153,9 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 }
 
 func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
+	if !m.IsConnected() {
+		return errors.New("working on a unconnected connection")
+	}
 	nBytes := uint32(1)
 	for {
 		_bytes, err := m.PeekReadableBytes(nBytes)
@@ -149,15 +170,15 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
 }
 
 func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
-	if m.reader == nil {
-		return nil, errors.New("error peeking from transport. No reader available")
+	if !m.IsConnected() {
+		return nil, errors.New("working on a unconnected connection")
 	}
 	return m.reader.Peek(int(numBytes))
 }
 
 func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
-	if m.reader == nil {
-		return nil, errors.New("error reading from transport. No reader available")
+	if !m.IsConnected() {
+		return nil, errors.New("working on a unconnected connection")
 	}
 	data := make([]byte, numBytes)
 	for i := uint32(0); i < numBytes; i++ {
@@ -171,8 +192,8 @@ func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
 }
 
 func (m *TransportInstance) Write(data []byte) error {
-	if m.udpConn == nil {
-		return errors.New("error writing to transport. No writer available")
+	if !m.IsConnected() {
+		return errors.New("working on a unconnected connection")
 	}
 	var num int
 	var err error
diff --git a/plc4go/spi/transports/udp/TransportInstance_test.go b/plc4go/spi/transports/udp/TransportInstance_test.go
index 7424fa472b..c31cf7a26e 100644
--- a/plc4go/spi/transports/udp/TransportInstance_test.go
+++ b/plc4go/spi/transports/udp/TransportInstance_test.go
@@ -70,18 +70,19 @@ func TestTransportInstance_Close(t *testing.T) {
 		reader         *bufio.Reader
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		wantErr bool
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, instance *TransportInstance)
+		wantErr     bool
 	}{
-		{
-			name: "close it",
-		},
 		{
 			name: "close it failing",
 			fields: fields{
 				udpConn: &net.UDPConn{},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			wantErr: true,
 		},
 		{
@@ -96,6 +97,9 @@ func TestTransportInstance_Close(t *testing.T) {
 					return listener.(*net.UDPConn)
 				}(),
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -109,6 +113,9 @@ func TestTransportInstance_Close(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if err := m.Close(); (err != nil) != tt.wantErr {
 				t.Errorf("Close() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -303,10 +310,11 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name:    "do it",
@@ -322,6 +330,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 					return pos < 2
 				},
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -335,6 +346,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if err := m.FillBuffer(tt.args.until); (err != nil) != tt.wantErr {
 				t.Errorf("FillBuffer() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -353,20 +367,27 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
 		reader         *bufio.Reader
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		want    uint32
-		wantErr bool
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        uint32
+		wantErr     bool
 	}{
 		{
 			name: "get em",
 			fields: fields{
 				reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: 4,
 		},
 		{
 			name: "get em (no reader)",
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -380,6 +401,9 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.GetNumBytesAvailableInBuffer()
 			if (err != nil) != tt.wantErr {
 				t.Errorf("GetNumBytesAvailableInBuffer() error = %v, wantErr %v", err, tt.wantErr)
@@ -443,17 +467,21 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 		numBytes uint32
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		want    []byte
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        []byte
+		wantErr     bool
 	}{
 		{
 			name: "peek it",
 			fields: fields{
 				reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: []byte{},
 		},
 		{
@@ -464,10 +492,13 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 			args: args{
 				numBytes: 3,
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: []byte{1, 2, 3},
 		},
 		{
-			name:    "peek it (no reader)",
+			name:    "peek it (not connected)",
 			wantErr: true,
 		},
 	}
@@ -482,6 +513,9 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.PeekReadableBytes(tt.args.numBytes)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("PeekReadableBytes() error = %v, wantErr %v", err, tt.wantErr)
@@ -508,17 +542,21 @@ func TestTransportInstance_Read(t *testing.T) {
 		numBytes uint32
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		want    []byte
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		manipulator func(t *testing.T, instance *TransportInstance)
+		want        []byte
+		wantErr     bool
 	}{
 		{
 			name: "read it",
 			fields: fields{
 				reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: []byte{},
 		},
 		{
@@ -529,6 +567,9 @@ func TestTransportInstance_Read(t *testing.T) {
 			args: args{
 				numBytes: 3,
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			want: []byte{1, 2, 3},
 		},
 		{
@@ -539,10 +580,13 @@ func TestTransportInstance_Read(t *testing.T) {
 			args: args{
 				numBytes: 5,
 			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			wantErr: true,
 		},
 		{
-			name:    "read it (no reader available)",
+			name:    "read it (not connected)",
 			wantErr: true,
 		},
 	}
@@ -557,6 +601,9 @@ func TestTransportInstance_Read(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			got, err := m.Read(tt.args.numBytes)
 			if (err != nil) != tt.wantErr {
 				t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr)
@@ -629,10 +676,12 @@ func TestTransportInstance_Write(t *testing.T) {
 		data []byte
 	}
 	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr bool
+		name        string
+		fields      fields
+		args        args
+		setup       func(t *testing.T, fields *fields, args *args)
+		manipulator func(t *testing.T, instance *TransportInstance)
+		wantErr     bool
 	}{
 		{
 			name:    "write it (no con)",
@@ -640,42 +689,45 @@ func TestTransportInstance_Write(t *testing.T) {
 		},
 		{
 			name: "write it",
-			fields: fields{
-				udpConn: func() *net.UDPConn {
-					listener, err := nettest.NewLocalPacketListener("udp")
-					require.NoError(t, err)
-					t.Cleanup(func() {
-						assert.NoError(t, listener.Close())
-					})
-					udp, err := net.DialUDP("udp", nil, listener.LocalAddr().(*net.UDPAddr))
-					require.NoError(t, err)
-					return udp
-				}(),
+			setup: func(t *testing.T, fields *fields, args *args) {
+				listener, err := nettest.NewLocalPacketListener("udp")
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, listener.Close())
+				})
+				udp, err := net.DialUDP("udp", nil, listener.LocalAddr().(*net.UDPAddr))
+				require.NoError(t, err)
+				fields.udpConn = udp
+			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
 			},
 		},
 		{
 			name: "write it with remote",
-			fields: func() fields {
+			setup: func(t *testing.T, fields *fields, args *args) {
 				listener, err := nettest.NewLocalPacketListener("udp")
 				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, listener.Close())
 				})
 				remoteAddress := listener.LocalAddr().(*net.UDPAddr)
-				return fields{
-					RemoteAddress: remoteAddress,
-					udpConn: func() *net.UDPConn {
-						udp, err := net.ListenUDP("udp", nil)
-						require.NoError(t, err)
-						return udp
-					}(),
-				}
-			}(),
+				fields.RemoteAddress = remoteAddress
+				udp, err := net.ListenUDP("udp", nil)
+				require.NoError(t, err)
+				fields.udpConn = udp
+			},
+			manipulator: func(t *testing.T, instance *TransportInstance) {
+				instance.connected.Store(true)
+			},
 			args: args{data: []byte{1, 2, 3, 4}},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			m := &TransportInstance{
 				LocalAddress:   tt.fields.LocalAddress,
 				RemoteAddress:  tt.fields.RemoteAddress,
@@ -685,6 +737,9 @@ func TestTransportInstance_Write(t *testing.T) {
 				udpConn:        tt.fields.udpConn,
 				reader:         tt.fields.reader,
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, m)
+			}
 			if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
 				t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
 			}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index 6406964125..2c4df42d6a 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -33,6 +33,7 @@ import (
 type DefaultBufferedTransportInstanceRequirements interface {
 	GetReader() transports.ExtendedReader
 	Connect() error
+	IsConnected() bool
 }
 
 type DefaultBufferedTransportInstance interface {
@@ -77,6 +78,9 @@ func (m *defaultBufferedTransportInstance) ConnectWithContext(ctx context.Contex
 }
 
 func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+	if !m.IsConnected() {
+		return 0, errors.New("working on a unconnected connection")
+	}
 	if m.GetReader() == nil {
 		return 0, nil
 	}
@@ -85,6 +89,9 @@ func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint3
 }
 
 func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
+	if !m.IsConnected() {
+		return errors.New("working on a unconnected connection")
+	}
 	if m.GetReader() == nil {
 		return nil
 	}
@@ -102,6 +109,9 @@ func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, curre
 }
 
 func (m *defaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
+	if !m.IsConnected() {
+		return nil, errors.New("working on a unconnected connection")
+	}
 	if m.GetReader() == nil {
 		return nil, errors.New("error peeking from transport. No reader available")
 	}
@@ -109,6 +119,9 @@ func (m *defaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([
 }
 
 func (m *defaultBufferedTransportInstance) Read(numBytes uint32) ([]byte, error) {
+	if !m.IsConnected() {
+		return nil, errors.New("working on a unconnected connection")
+	}
 	if m.GetReader() == nil {
 		return nil, errors.New("error reading from transport. No reader available")
 	}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
index 9d13f28f32..c82ef148d0 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
@@ -135,6 +135,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
 				expect := requirements.EXPECT()
 				expect.GetReader().Return(nil)
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 		},
@@ -145,7 +146,9 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 		},
@@ -156,7 +159,9 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
 			}},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			wantErr: true,
@@ -192,7 +197,9 @@ func Test_defaultBufferedTransportInstance_GetNumBytesAvailableInBuffer(t *testi
 			name: "get it without reader",
 			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(nil)
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(nil)
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 		},
@@ -200,7 +207,9 @@ func Test_defaultBufferedTransportInstance_GetNumBytesAvailableInBuffer(t *testi
 			name: "get it with reader",
 			mockSetup: func(t *testing.T, fields *fields) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			want: 2,
@@ -245,7 +254,9 @@ func Test_defaultBufferedTransportInstance_PeekReadableBytes(t *testing.T) {
 			name: "peek it without reader",
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(nil)
+				expect := requirements.EXPECT()
+				expect.IsConnected().Return(true)
+				expect.GetReader().Return(nil)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			wantErr: true,
@@ -255,7 +266,9 @@ func Test_defaultBufferedTransportInstance_PeekReadableBytes(t *testing.T) {
 			args: args{numBytes: 2},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			want: []byte{0x0, 0x0},
@@ -300,7 +313,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
 			name: "read it without reader",
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(nil)
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(nil)
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			wantErr: true,
@@ -310,7 +325,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
 			args: args{numBytes: 2},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			want: []byte{0x0, 0x0},
@@ -320,7 +337,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
 			args: args{numBytes: 2},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
 				requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
-				requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0})))
+				expect := requirements.EXPECT()
+				expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0})))
+				expect.IsConnected().Return(true)
 				fields.DefaultBufferedTransportInstanceRequirements = requirements
 			},
 			wantErr: true,
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
index c5bb927ebf..bf58f77265 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
@@ -123,6 +123,47 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAn
 	return _c
 }
 
+// IsConnected provides a mock function with given fields:
+func (_m *MockDefaultBufferedTransportInstanceRequirements) IsConnected() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsConnected'
+type MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call struct {
+	*mock.Call
+}
+
+// IsConnected is a helper method to define mock.On call
+func (_e *MockDefaultBufferedTransportInstanceRequirements_Expecter) IsConnected() *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+	return &MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call{Call: _e.mock.On("IsConnected")}
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) Run(run func()) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) Return(_a0 bool) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) RunAndReturn(run func() bool) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // NewMockDefaultBufferedTransportInstanceRequirements creates a new instance of MockDefaultBufferedTransportInstanceRequirements. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
 // The first argument is typically a *testing.T value.
 func NewMockDefaultBufferedTransportInstanceRequirements(t interface {