You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/20 07:24:21 UTC
[plc4x] branch develop updated: fix(plc4go/cbus): ensure TransportInstances are properly synced
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 73317f81ae fix(plc4go/cbus): ensure TransportInstances are properly synced
73317f81ae is described below
commit 73317f81ae3a410753775d713577378868b03e8c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 20 09:24:14 2023 +0200
fix(plc4go/cbus): ensure TransportInstances are properly synced
---
plc4go/spi/transports/pcap/TransportInstance.go | 3 +
plc4go/spi/transports/serial/TransportInstance.go | 27 +++-
plc4go/spi/transports/tcp/TransportInstance.go | 2 +-
plc4go/spi/transports/test/TransportInstance.go | 36 +++--
.../spi/transports/test/TransportInstance_test.go | 170 +++++++++++----------
plc4go/spi/transports/udp/TransportInstance.go | 45 ++++--
.../spi/transports/udp/TransportInstance_test.go | 157 ++++++++++++-------
.../utils/DefaultBufferedTransportInstance.go | 13 ++
.../utils/DefaultBufferedTransportInstance_test.go | 37 +++--
...ltBufferedTransportInstanceRequirements_test.go | 41 +++++
10 files changed, 360 insertions(+), 171 deletions(-)
diff --git a/plc4go/spi/transports/pcap/TransportInstance.go b/plc4go/spi/transports/pcap/TransportInstance.go
index 20a0c7e9df..3f396d9e93 100644
--- a/plc4go/spi/transports/pcap/TransportInstance.go
+++ b/plc4go/spi/transports/pcap/TransportInstance.go
@@ -167,6 +167,9 @@ func (m *TransportInstance) IsConnected() bool {
}
func (m *TransportInstance) Write(_ []byte) error {
+ if !m.connected.Load() {
+ return errors.New("error writing to transport. No writer available")
+ }
return errors.New("Write to pcap not supported")
}
diff --git a/plc4go/spi/transports/serial/TransportInstance.go b/plc4go/spi/transports/serial/TransportInstance.go
index 33afeaf212..afa5e90322 100644
--- a/plc4go/spi/transports/serial/TransportInstance.go
+++ b/plc4go/spi/transports/serial/TransportInstance.go
@@ -23,6 +23,8 @@ import (
"bufio"
"fmt"
"io"
+ "sync"
+ "sync/atomic"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -35,12 +37,17 @@ import (
type TransportInstance struct {
transportUtils.DefaultBufferedTransportInstance
+
SerialPortName string
BaudRate uint
ConnectTimeout uint32
- transport *Transport
- serialPort io.ReadWriteCloser
- reader *bufio.Reader
+
+ connected atomic.Bool
+ stateChangeMutex sync.Mutex
+
+ transport *Transport
+ serialPort io.ReadWriteCloser
+ reader *bufio.Reader
log zerolog.Logger
}
@@ -59,6 +66,12 @@ func NewTransportInstance(serialPortName string, baudRate uint, connectTimeout u
}
func (m *TransportInstance) Connect() error {
+ m.stateChangeMutex.Lock()
+ defer m.stateChangeMutex.Unlock()
+ if m.connected.Load() {
+ return errors.New("Already connected")
+ }
+
var err error
config := serial.OpenOptions{PortName: m.SerialPortName, BaudRate: m.BaudRate, DataBits: 8, StopBits: 1, MinimumReadSize: 0, InterCharacterTimeout: 100 /*, RTSCTSFlowControl: true*/}
m.serialPort, err = serial.Open(config)
@@ -80,6 +93,9 @@ func (m *TransportInstance) Connect() error {
}
func (m *TransportInstance) Close() error {
+ m.stateChangeMutex.Lock()
+ defer m.stateChangeMutex.Unlock()
+
if m.serialPort == nil {
return nil
}
@@ -88,6 +104,8 @@ func (m *TransportInstance) Close() error {
return errors.Wrap(err, "error closing serial port")
}
m.serialPort = nil
+
+ m.connected.Store(false)
return nil
}
@@ -96,6 +114,9 @@ func (m *TransportInstance) IsConnected() bool {
}
func (m *TransportInstance) Write(data []byte) error {
+ if !m.connected.Load() {
+ return errors.New("error writing to transport. Not connected")
+ }
if m.serialPort == nil {
return errors.New("error writing to transport. No writer available")
}
diff --git a/plc4go/spi/transports/tcp/TransportInstance.go b/plc4go/spi/transports/tcp/TransportInstance.go
index 3e5e7ff705..df2ffc8be8 100644
--- a/plc4go/spi/transports/tcp/TransportInstance.go
+++ b/plc4go/spi/transports/tcp/TransportInstance.go
@@ -112,7 +112,7 @@ func (m *TransportInstance) IsConnected() bool {
func (m *TransportInstance) Write(data []byte) error {
if !m.connected.Load() {
- return errors.New("error writing to transport. No writer available")
+ return errors.New("error writing to transport. Not connected")
}
num, err := m.tcpConn.Write(data)
if err != nil {
diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go
index b01d94ef3b..936f4dae40 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -26,6 +26,7 @@ import (
"encoding/hex"
"math"
"sync"
+ "sync/atomic"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -37,11 +38,11 @@ import (
type TransportInstance struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
dataMutex sync.RWMutex
+ connected atomic.Bool
stateChangeMutex sync.RWMutex
log zerolog.Logger
@@ -51,7 +52,6 @@ func NewTransportInstance(transport *Transport, _options ...options.WithOption)
return &TransportInstance{
readBuffer: []byte{},
writeBuffer: []byte{},
- connected: false,
transport: transport,
log: options.ExtractCustomLogger(_options...),
@@ -61,12 +61,12 @@ func NewTransportInstance(transport *Transport, _options ...options.WithOption)
func (m *TransportInstance) Connect() error {
m.stateChangeMutex.Lock()
defer m.stateChangeMutex.Unlock()
- if m.connected {
+ if m.connected.Load() {
m.log.Warn().Msg("already connected")
return nil
}
m.log.Trace().Msg("Connect")
- m.connected = true
+ m.connected.Store(true)
return nil
}
@@ -77,15 +77,16 @@ func (m *TransportInstance) ConnectWithContext(_ context.Context) error {
func (m *TransportInstance) Close() error {
m.stateChangeMutex.Lock()
defer m.stateChangeMutex.Unlock()
+ if !m.connected.Load() {
+ return nil
+ }
m.log.Trace().Msg("Close")
- m.connected = false
+ m.connected.Store(true)
return nil
}
func (m *TransportInstance) IsConnected() bool {
- m.stateChangeMutex.RLock()
- defer m.stateChangeMutex.RUnlock()
- return m.connected
+ return m.connected.Load()
}
func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
@@ -101,7 +102,7 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ return errors.New("working on a unconnected connection")
}
m.log.Trace().Msg("Fill the buffer")
nBytes := uint32(1)
@@ -125,7 +126,7 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ return nil, errors.New("working on a unconnected connection")
}
m.dataMutex.RLock()
defer m.dataMutex.RUnlock()
@@ -144,7 +145,7 @@ func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ return nil, errors.New("working on a unconnected connection")
}
m.dataMutex.Lock()
defer m.dataMutex.Unlock()
@@ -165,7 +166,7 @@ func (m *TransportInstance) SetWriteInterceptor(writeInterceptor func(transportI
func (m *TransportInstance) Write(data []byte) error {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ return errors.New("working on a unconnected connection")
}
if m.writeInterceptor != nil {
m.log.Trace().Msgf("Passing data to write interceptor\n%s", hex.Dump(data))
@@ -180,7 +181,8 @@ func (m *TransportInstance) Write(data []byte) error {
func (m *TransportInstance) FillReadBuffer(data []byte) {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ m.log.Error().Msg("working on a unconnected connection")
+ return
}
m.dataMutex.Lock()
defer m.dataMutex.Unlock()
@@ -190,7 +192,8 @@ func (m *TransportInstance) FillReadBuffer(data []byte) {
func (m *TransportInstance) GetNumDrainableBytes() uint32 {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ m.log.Error().Msg("working on a unconnected connection")
+ return 0
}
m.dataMutex.RLock()
defer m.dataMutex.RUnlock()
@@ -200,7 +203,8 @@ func (m *TransportInstance) GetNumDrainableBytes() uint32 {
func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
if !m.IsConnected() {
- panic(errors.New("working on a unconnected connection"))
+ m.log.Error().Msg("working on a unconnected connection")
+ return nil
}
m.dataMutex.Lock()
defer m.dataMutex.Unlock()
@@ -211,5 +215,5 @@ func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
}
func (m *TransportInstance) String() string {
- return "test" //TODO: maybe use plc4xgen
+ return "test"
}
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go
index 78cbf318b3..fd903d2f28 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -58,7 +58,6 @@ func TestTransportInstance_Close(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -76,7 +75,6 @@ func TestTransportInstance_Close(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -91,7 +89,6 @@ func TestTransportInstance_Connect(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -109,7 +106,6 @@ func TestTransportInstance_Connect(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -124,7 +120,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -146,7 +141,6 @@ func TestTransportInstance_ConnectWithContext(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -161,7 +155,6 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -169,15 +162,16 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
numBytes uint32
}
tests := []struct {
- name string
- fields fields
- args args
- want []byte
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want []byte
}{
{
name: "drain it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
}
@@ -186,10 +180,12 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if got := m.DrainWriteBuffer(tt.args.numBytes); !assert.Equal(t, tt.want, got) {
t.Errorf("DrainWriteBuffer() = %v, want %v", got, tt.want)
}
@@ -201,7 +197,6 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -209,27 +204,27 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
}
tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ wantErr bool
}{
{
name: "fill it (errors)",
- fields: fields{
- connected: true,
- },
args: args{
until: func(pos uint, currentByte byte, reader transports.ExtendedReader) bool {
return pos < 3
},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
wantErr: true,
},
{
name: "fill it",
fields: fields{
- connected: true,
readBuffer: []byte{1, 2, 3, 4},
},
args: args{
@@ -237,6 +232,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
return pos < 3
},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -244,10 +242,12 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if err := m.FillBuffer(tt.args.until); (err != nil) != tt.wantErr {
t.Errorf("FillBuffer() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -259,7 +259,6 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -267,19 +266,22 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
data []byte
}
tests := []struct {
- name string
- fields fields
- args args
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
}{
{
name: "fill it",
fields: fields{
- connected: true,
readBuffer: []byte{1, 2, 3, 4},
},
args: args{
data: []byte{1, 2, 3, 4},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -287,10 +289,12 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
m.FillReadBuffer(tt.args.data)
})
}
@@ -300,28 +304,30 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
tests := []struct {
- name string
- fields fields
- want uint32
- wantErr bool
+ name string
+ fields fields
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want uint32
+ wantErr bool
}{
{
name: "get it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
{
name: "get it too",
fields: fields{
- connected: true,
readBuffer: []byte{1, 2, 3, 4},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: 4,
},
}
@@ -330,10 +336,12 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.GetNumBytesAvailableInBuffer()
if (err != nil) != tt.wantErr {
t.Errorf("GetNumBytesAvailableInBuffer() error = %v, wantErr %v", err, tt.wantErr)
@@ -350,19 +358,19 @@ func TestTransportInstance_GetNumDrainableBytes(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
tests := []struct {
- name string
- fields fields
- want uint32
+ name string
+ fields fields
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want uint32
}{
{
name: "get it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
}
@@ -371,10 +379,12 @@ func TestTransportInstance_GetNumDrainableBytes(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if got := m.GetNumDrainableBytes(); got != tt.want {
t.Errorf("GetNumDrainableBytes() = %v, want %v", got, tt.want)
}
@@ -386,7 +396,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -404,7 +413,6 @@ func TestTransportInstance_IsConnected(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -419,7 +427,6 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -427,16 +434,17 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
numBytes uint32
}
tests := []struct {
- name string
- fields fields
- args args
- want []byte
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want []byte
+ wantErr bool
}{
{
name: "peek it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
}
@@ -445,10 +453,12 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.PeekReadableBytes(tt.args.numBytes)
if (err != nil) != tt.wantErr {
t.Errorf("PeekReadableBytes() error = %v, wantErr %v", err, tt.wantErr)
@@ -465,7 +475,6 @@ func TestTransportInstance_Read(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -473,16 +482,17 @@ func TestTransportInstance_Read(t *testing.T) {
numBytes uint32
}
tests := []struct {
- name string
- fields fields
- args args
- want []byte
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want []byte
+ wantErr bool
}{
{
name: "read it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
wantErr: true,
},
@@ -492,10 +502,12 @@ func TestTransportInstance_Read(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.Read(tt.args.numBytes)
if (err != nil) != tt.wantErr {
t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr)
@@ -512,7 +524,6 @@ func TestTransportInstance_SetWriteInterceptor(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -533,7 +544,6 @@ func TestTransportInstance_SetWriteInterceptor(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -546,7 +556,6 @@ func TestTransportInstance_String(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -565,7 +574,6 @@ func TestTransportInstance_String(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
@@ -580,7 +588,6 @@ func TestTransportInstance_Write(t *testing.T) {
type fields struct {
readBuffer []byte
writeBuffer []byte
- connected bool
transport *Transport
writeInterceptor func(transportInstance *TransportInstance, data []byte)
}
@@ -588,21 +595,21 @@ func TestTransportInstance_Write(t *testing.T) {
data []byte
}
tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ wantErr bool
}{
{
name: "write it",
- fields: fields{
- connected: true,
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
{
name: "write it",
fields: fields{
- connected: true,
writeInterceptor: func(transportInstance *TransportInstance, data []byte) {
assert.NotNil(t, transportInstance)
assert.Equal(t, []byte{1, 2, 3, 4}, data)
@@ -611,6 +618,9 @@ func TestTransportInstance_Write(t *testing.T) {
args: args{
data: []byte{1, 2, 3, 4},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -618,10 +628,12 @@ func TestTransportInstance_Write(t *testing.T) {
m := &TransportInstance{
readBuffer: tt.fields.readBuffer,
writeBuffer: tt.fields.writeBuffer,
- connected: tt.fields.connected,
transport: tt.fields.transport,
writeInterceptor: tt.fields.writeInterceptor,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
}
diff --git a/plc4go/spi/transports/udp/TransportInstance.go b/plc4go/spi/transports/udp/TransportInstance.go
index c99a4a2780..383123ada8 100644
--- a/plc4go/spi/transports/udp/TransportInstance.go
+++ b/plc4go/spi/transports/udp/TransportInstance.go
@@ -24,6 +24,8 @@ import (
"context"
"fmt"
"net"
+ "sync"
+ "sync/atomic"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -38,9 +40,13 @@ type TransportInstance struct {
RemoteAddress *net.UDPAddr
ConnectTimeout uint32
SoReUse bool
- transport *Transport
- udpConn *net.UDPConn
- reader *bufio.Reader
+
+ transport *Transport
+ udpConn *net.UDPConn
+ reader *bufio.Reader
+
+ connected atomic.Bool
+ stateChangeMutex sync.RWMutex
log zerolog.Logger
}
@@ -62,6 +68,11 @@ func (m *TransportInstance) Connect() error {
}
func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
+ if m.connected.Load() {
+ return errors.New("already connected")
+ }
+ m.stateChangeMutex.Lock()
+ defer m.stateChangeMutex.Unlock()
// If we haven't provided a local address, have the system figure it out by dialing
// the remote address and then using that connections local address as local address.
if m.LocalAddress == nil && m.RemoteAddress != nil {
@@ -107,26 +118,33 @@ func (m *TransportInstance) ConnectWithContext(ctx context.Context) error {
}()*/
m.reader = bufio.NewReader(m.udpConn)
+ m.connected.Store(true)
+
return nil
}
func (m *TransportInstance) Close() error {
- if m.udpConn == nil {
+ m.stateChangeMutex.Lock()
+ defer m.stateChangeMutex.Unlock()
+ if !m.connected.Load() {
return nil
}
err := m.udpConn.Close()
if err != nil {
return errors.Wrap(err, "error closing connection")
}
- m.udpConn = nil
+ m.connected.Store(false)
return nil
}
func (m *TransportInstance) IsConnected() bool {
- return m.udpConn != nil
+ return m.connected.Load()
}
func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+ if !m.IsConnected() {
+ return 0, errors.New("working on a unconnected connection")
+ }
if m.reader == nil {
return 0, nil
}
@@ -135,6 +153,9 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
}
func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
+ if !m.IsConnected() {
+ return errors.New("working on a unconnected connection")
+ }
nBytes := uint32(1)
for {
_bytes, err := m.PeekReadableBytes(nBytes)
@@ -149,15 +170,15 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
}
func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
- if m.reader == nil {
- return nil, errors.New("error peeking from transport. No reader available")
+ if !m.IsConnected() {
+ return nil, errors.New("working on a unconnected connection")
}
return m.reader.Peek(int(numBytes))
}
func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
- if m.reader == nil {
- return nil, errors.New("error reading from transport. No reader available")
+ if !m.IsConnected() {
+ return nil, errors.New("working on a unconnected connection")
}
data := make([]byte, numBytes)
for i := uint32(0); i < numBytes; i++ {
@@ -171,8 +192,8 @@ func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
}
func (m *TransportInstance) Write(data []byte) error {
- if m.udpConn == nil {
- return errors.New("error writing to transport. No writer available")
+ if !m.IsConnected() {
+ return errors.New("working on a unconnected connection")
}
var num int
var err error
diff --git a/plc4go/spi/transports/udp/TransportInstance_test.go b/plc4go/spi/transports/udp/TransportInstance_test.go
index 7424fa472b..c31cf7a26e 100644
--- a/plc4go/spi/transports/udp/TransportInstance_test.go
+++ b/plc4go/spi/transports/udp/TransportInstance_test.go
@@ -70,18 +70,19 @@ func TestTransportInstance_Close(t *testing.T) {
reader *bufio.Reader
}
tests := []struct {
- name string
- fields fields
- wantErr bool
+ name string
+ fields fields
+ manipulator func(t *testing.T, instance *TransportInstance)
+ wantErr bool
}{
- {
- name: "close it",
- },
{
name: "close it failing",
fields: fields{
udpConn: &net.UDPConn{},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
wantErr: true,
},
{
@@ -96,6 +97,9 @@ func TestTransportInstance_Close(t *testing.T) {
return listener.(*net.UDPConn)
}(),
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -109,6 +113,9 @@ func TestTransportInstance_Close(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if err := m.Close(); (err != nil) != tt.wantErr {
t.Errorf("Close() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -303,10 +310,11 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool
}
tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ wantErr bool
}{
{
name: "do it",
@@ -322,6 +330,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
return pos < 2
},
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -335,6 +346,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if err := m.FillBuffer(tt.args.until); (err != nil) != tt.wantErr {
t.Errorf("FillBuffer() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -353,20 +367,27 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
reader *bufio.Reader
}
tests := []struct {
- name string
- fields fields
- want uint32
- wantErr bool
+ name string
+ fields fields
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want uint32
+ wantErr bool
}{
{
name: "get em",
fields: fields{
reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: 4,
},
{
name: "get em (no reader)",
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
},
}
for _, tt := range tests {
@@ -380,6 +401,9 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.GetNumBytesAvailableInBuffer()
if (err != nil) != tt.wantErr {
t.Errorf("GetNumBytesAvailableInBuffer() error = %v, wantErr %v", err, tt.wantErr)
@@ -443,17 +467,21 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
numBytes uint32
}
tests := []struct {
- name string
- fields fields
- args args
- want []byte
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want []byte
+ wantErr bool
}{
{
name: "peek it",
fields: fields{
reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: []byte{},
},
{
@@ -464,10 +492,13 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
args: args{
numBytes: 3,
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: []byte{1, 2, 3},
},
{
- name: "peek it (no reader)",
+ name: "peek it (not connected)",
wantErr: true,
},
}
@@ -482,6 +513,9 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.PeekReadableBytes(tt.args.numBytes)
if (err != nil) != tt.wantErr {
t.Errorf("PeekReadableBytes() error = %v, wantErr %v", err, tt.wantErr)
@@ -508,17 +542,21 @@ func TestTransportInstance_Read(t *testing.T) {
numBytes uint32
}
tests := []struct {
- name string
- fields fields
- args args
- want []byte
- wantErr bool
+ name string
+ fields fields
+ args args
+ manipulator func(t *testing.T, instance *TransportInstance)
+ want []byte
+ wantErr bool
}{
{
name: "read it",
fields: fields{
reader: bufio.NewReader(bytes.NewReader([]byte{1, 2, 3, 4})),
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: []byte{},
},
{
@@ -529,6 +567,9 @@ func TestTransportInstance_Read(t *testing.T) {
args: args{
numBytes: 3,
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
want: []byte{1, 2, 3},
},
{
@@ -539,10 +580,13 @@ func TestTransportInstance_Read(t *testing.T) {
args: args{
numBytes: 5,
},
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
wantErr: true,
},
{
- name: "read it (no reader available)",
+ name: "read it (not connected)",
wantErr: true,
},
}
@@ -557,6 +601,9 @@ func TestTransportInstance_Read(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
got, err := m.Read(tt.args.numBytes)
if (err != nil) != tt.wantErr {
t.Errorf("Read() error = %v, wantErr %v", err, tt.wantErr)
@@ -629,10 +676,12 @@ func TestTransportInstance_Write(t *testing.T) {
data []byte
}
tests := []struct {
- name string
- fields fields
- args args
- wantErr bool
+ name string
+ fields fields
+ args args
+ setup func(t *testing.T, fields *fields, args *args)
+ manipulator func(t *testing.T, instance *TransportInstance)
+ wantErr bool
}{
{
name: "write it (no con)",
@@ -640,42 +689,45 @@ func TestTransportInstance_Write(t *testing.T) {
},
{
name: "write it",
- fields: fields{
- udpConn: func() *net.UDPConn {
- listener, err := nettest.NewLocalPacketListener("udp")
- require.NoError(t, err)
- t.Cleanup(func() {
- assert.NoError(t, listener.Close())
- })
- udp, err := net.DialUDP("udp", nil, listener.LocalAddr().(*net.UDPAddr))
- require.NoError(t, err)
- return udp
- }(),
+ setup: func(t *testing.T, fields *fields, args *args) {
+ listener, err := nettest.NewLocalPacketListener("udp")
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, listener.Close())
+ })
+ udp, err := net.DialUDP("udp", nil, listener.LocalAddr().(*net.UDPAddr))
+ require.NoError(t, err)
+ fields.udpConn = udp
+ },
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
},
},
{
name: "write it with remote",
- fields: func() fields {
+ setup: func(t *testing.T, fields *fields, args *args) {
listener, err := nettest.NewLocalPacketListener("udp")
require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, listener.Close())
})
remoteAddress := listener.LocalAddr().(*net.UDPAddr)
- return fields{
- RemoteAddress: remoteAddress,
- udpConn: func() *net.UDPConn {
- udp, err := net.ListenUDP("udp", nil)
- require.NoError(t, err)
- return udp
- }(),
- }
- }(),
+ fields.RemoteAddress = remoteAddress
+ udp, err := net.ListenUDP("udp", nil)
+ require.NoError(t, err)
+ fields.udpConn = udp
+ },
+ manipulator: func(t *testing.T, instance *TransportInstance) {
+ instance.connected.Store(true)
+ },
args: args{data: []byte{1, 2, 3, 4}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields, &tt.args)
+ }
m := &TransportInstance{
LocalAddress: tt.fields.LocalAddress,
RemoteAddress: tt.fields.RemoteAddress,
@@ -685,6 +737,9 @@ func TestTransportInstance_Write(t *testing.T) {
udpConn: tt.fields.udpConn,
reader: tt.fields.reader,
}
+ if tt.manipulator != nil {
+ tt.manipulator(t, m)
+ }
if err := m.Write(tt.args.data); (err != nil) != tt.wantErr {
t.Errorf("Write() error = %v, wantErr %v", err, tt.wantErr)
}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
index 6406964125..2c4df42d6a 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance.go
@@ -33,6 +33,7 @@ import (
type DefaultBufferedTransportInstanceRequirements interface {
GetReader() transports.ExtendedReader
Connect() error
+ IsConnected() bool
}
type DefaultBufferedTransportInstance interface {
@@ -77,6 +78,9 @@ func (m *defaultBufferedTransportInstance) ConnectWithContext(ctx context.Contex
}
func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+ if !m.IsConnected() {
+ return 0, errors.New("working on a unconnected connection")
+ }
if m.GetReader() == nil {
return 0, nil
}
@@ -85,6 +89,9 @@ func (m *defaultBufferedTransportInstance) GetNumBytesAvailableInBuffer() (uint3
}
func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader transports.ExtendedReader) bool) error {
+ if !m.IsConnected() {
+ return errors.New("working on a unconnected connection")
+ }
if m.GetReader() == nil {
return nil
}
@@ -102,6 +109,9 @@ func (m *defaultBufferedTransportInstance) FillBuffer(until func(pos uint, curre
}
func (m *defaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
+ if !m.IsConnected() {
+ return nil, errors.New("working on a unconnected connection")
+ }
if m.GetReader() == nil {
return nil, errors.New("error peeking from transport. No reader available")
}
@@ -109,6 +119,9 @@ func (m *defaultBufferedTransportInstance) PeekReadableBytes(numBytes uint32) ([
}
func (m *defaultBufferedTransportInstance) Read(numBytes uint32) ([]byte, error) {
+ if !m.IsConnected() {
+ return nil, errors.New("working on a unconnected connection")
+ }
if m.GetReader() == nil {
return nil, errors.New("error reading from transport. No reader available")
}
diff --git a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
index 9d13f28f32..c82ef148d0 100644
--- a/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
+++ b/plc4go/spi/transports/utils/DefaultBufferedTransportInstance_test.go
@@ -135,6 +135,7 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
expect := requirements.EXPECT()
expect.GetReader().Return(nil)
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
},
@@ -145,7 +146,9 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
}},
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
},
@@ -156,7 +159,9 @@ func Test_defaultBufferedTransportInstance_FillBuffer(t *testing.T) {
}},
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
wantErr: true,
@@ -192,7 +197,9 @@ func Test_defaultBufferedTransportInstance_GetNumBytesAvailableInBuffer(t *testi
name: "get it without reader",
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(nil)
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(nil)
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
},
@@ -200,7 +207,9 @@ func Test_defaultBufferedTransportInstance_GetNumBytesAvailableInBuffer(t *testi
name: "get it with reader",
mockSetup: func(t *testing.T, fields *fields) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
want: 2,
@@ -245,7 +254,9 @@ func Test_defaultBufferedTransportInstance_PeekReadableBytes(t *testing.T) {
name: "peek it without reader",
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(nil)
+ expect := requirements.EXPECT()
+ expect.IsConnected().Return(true)
+ expect.GetReader().Return(nil)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
wantErr: true,
@@ -255,7 +266,9 @@ func Test_defaultBufferedTransportInstance_PeekReadableBytes(t *testing.T) {
args: args{numBytes: 2},
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
want: []byte{0x0, 0x0},
@@ -300,7 +313,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
name: "read it without reader",
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(nil)
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(nil)
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
wantErr: true,
@@ -310,7 +325,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
args: args{numBytes: 2},
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0, 0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
want: []byte{0x0, 0x0},
@@ -320,7 +337,9 @@ func Test_defaultBufferedTransportInstance_Read(t *testing.T) {
args: args{numBytes: 2},
mockSetup: func(t *testing.T, fields *fields, args *args) {
requirements := NewMockDefaultBufferedTransportInstanceRequirements(t)
- requirements.EXPECT().GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0})))
+ expect := requirements.EXPECT()
+ expect.GetReader().Return(bufio.NewReader(bytes.NewReader([]byte{0x0})))
+ expect.IsConnected().Return(true)
fields.DefaultBufferedTransportInstanceRequirements = requirements
},
wantErr: true,
diff --git a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
index c5bb927ebf..bf58f77265 100644
--- a/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
+++ b/plc4go/spi/transports/utils/mock_DefaultBufferedTransportInstanceRequirements_test.go
@@ -123,6 +123,47 @@ func (_c *MockDefaultBufferedTransportInstanceRequirements_GetReader_Call) RunAn
return _c
}
+// IsConnected provides a mock function with given fields:
+func (_m *MockDefaultBufferedTransportInstanceRequirements) IsConnected() bool {
+ ret := _m.Called()
+
+ var r0 bool
+ if rf, ok := ret.Get(0).(func() bool); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(bool)
+ }
+
+ return r0
+}
+
+// MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsConnected'
+type MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call struct {
+ *mock.Call
+}
+
+// IsConnected is a helper method to define mock.On call
+func (_e *MockDefaultBufferedTransportInstanceRequirements_Expecter) IsConnected() *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+ return &MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call{Call: _e.mock.On("IsConnected")}
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) Run(run func()) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+ _c.Call.Run(func(args mock.Arguments) {
+ run()
+ })
+ return _c
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) Return(_a0 bool) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+ _c.Call.Return(_a0)
+ return _c
+}
+
+func (_c *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call) RunAndReturn(run func() bool) *MockDefaultBufferedTransportInstanceRequirements_IsConnected_Call {
+ _c.Call.Return(run)
+ return _c
+}
+
// NewMockDefaultBufferedTransportInstanceRequirements creates a new instance of MockDefaultBufferedTransportInstanceRequirements. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewMockDefaultBufferedTransportInstanceRequirements(t interface {