You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 19:44:58 UTC

[plc4x] branch develop updated (6d23e0cbc5 -> 3f019e94b9)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 6d23e0cbc5 test(plc4go/cbus): fixed defer cleanup messup
     new 3c7acbdfa5 fix(plc4go/spi): test transport instance panics if worked with on disconnected state
     new 3456bcaa54 fix(plc4go/spi): re-order disconnection on Defaukt codec
     new 0784d3786f fix(plc4go/spi): avoid shutting down the shared executor
     new 725471b2ac test(plc4go/cbus): fix for nasty test which only runs on trace now...
     new 3f019e94b9 test(plc4go/cbus): fixed more test issues

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Connection_test.go            |  8 +++++-
 plc4go/internal/cbus/Discoverer.go                 |  7 +++---
 plc4go/internal/cbus/Discoverer_test.go            | 10 +++++++-
 plc4go/internal/cbus/MessageCodec_test.go          | 14 ++++++++++-
 plc4go/internal/cbus/Reader_test.go                | 14 ++++++-----
 plc4go/spi/default/DefaultCodec.go                 |  4 +--
 .../spi/transactions/RequestTransactionManager.go  | 11 ++++++--
 plc4go/spi/transports/test/TransportInstance.go    | 24 ++++++++++++++++++
 .../spi/transports/test/TransportInstance_test.go  | 29 ++++++++++++++++++++--
 9 files changed, 102 insertions(+), 19 deletions(-)


[plc4x] 05/05: test(plc4go/cbus): fixed more test issues

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 3f019e94b98bfe25230c536bf4b9f29df48307a8
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 21:44:50 2023 +0200

    test(plc4go/cbus): fixed more test issues
---
 plc4go/internal/cbus/Connection_test.go   |  2 +-
 plc4go/internal/cbus/Discoverer.go        |  7 +++----
 plc4go/internal/cbus/Discoverer_test.go   |  9 ++++++++-
 plc4go/internal/cbus/MessageCodec_test.go | 14 +++++++++++++-
 plc4go/internal/cbus/Reader_test.go       | 14 ++++++++------
 5 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index d0924f7674..82a07cefb7 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -976,7 +976,7 @@ func TestConnection_sendReset(t *testing.T) {
 				codec := NewMessageCodec(ti, _options...)
 				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
-					assert.Error(t, codec.Disconnect())
+					assert.NoError(t, codec.Disconnect())
 				})
 				fields.messageCodec = codec
 			},
diff --git a/plc4go/internal/cbus/Discoverer.go b/plc4go/internal/cbus/Discoverer.go
index a3648227a9..3271067ab6 100644
--- a/plc4go/internal/cbus/Discoverer.go
+++ b/plc4go/internal/cbus/Discoverer.go
@@ -216,6 +216,7 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
 		}
 		defer func() {
 			// Disconnect codec when done
+			d.log.Debug().Msg("Shutting down codec")
 			if err := codec.Disconnect(); err != nil {
 				d.log.Warn().Err(err).Msg("Error disconnecting codec")
 			}
@@ -237,14 +238,12 @@ func (d *Discoverer) createDeviceScanDispatcher(tcpTransportInstance *tcp.Transp
 		// TODO: Make this configurable
 		timeout := time.NewTimer(time.Second * 1)
 		defer utils.CleanupTimer(timeout)
-		timeout.Stop()
 		for start := time.Now(); time.Since(start) < time.Second*5; {
 			timeout.Reset(time.Second * 1)
 			select {
 			case receivedMessage := <-codec.GetDefaultIncomingMessageChannel():
-				if !timeout.Stop() {
-					<-timeout.C
-				}
+				// Cleanup, going to be resetted again
+				utils.CleanupTimer(timeout)
 				cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessage)
 				if !ok {
 					continue
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index b96f45ec6f..544ac2d843 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -29,6 +29,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/plc4x/plc4go/pkg/api/config"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
@@ -190,12 +191,13 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 						t.Error(err)
 						return
 					}
+					t.Logf("writing out")
 					write, err := conn.Write([]byte("x.890050435F434E49454422\r\n"))
 					if err != nil {
 						t.Error(err)
 						return
 					}
-					t.Logf("%d written", write)
+					t.Logf("%d bytes written", write)
 				}()
 				t.Cleanup(func() {
 					if err := listen.Close(); err != nil {
@@ -210,6 +212,9 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 				require.NoError(t, err)
 				instance, err := transport.CreateTransportInstance(*parse, nil, _options...)
 				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, instance.Close())
+				})
 				args.tcpTransportInstance = instance.(*tcp.TransportInstance)
 			},
 		},
@@ -228,7 +233,9 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 				tt.args.callback(t, event)
 			})
 			assert.NotNilf(t, dispatcher, "createDeviceScanDispatcher(%v, func())", tt.args.tcpTransportInstance)
+			t.Log("Calling dispatcher now")
 			dispatcher()
+			t.Log("dispatching done")
 		})
 	}
 }
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 91db4635ad..e104441a90 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -69,8 +69,9 @@ func TestMessageCodec_Send(t *testing.T) {
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
 				codec := NewMessageCodec(instance, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
-					assert.Error(t, codec.Disconnect())
+					assert.NoError(t, codec.Disconnect())
 				})
 				fields.DefaultCodec = codec
 			},
@@ -126,6 +127,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
@@ -153,6 +155,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("!"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -175,6 +178,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("@A62120\r@A62120\r"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -197,6 +201,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("what on earth\n\r"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -219,6 +224,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("AFFE!!!\r"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -273,6 +279,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("@1A2001!!!\r"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -295,6 +302,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("86040200F940380001000000000000000008000000000000000000000000FA\r\n"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -502,6 +510,7 @@ func TestMessageCodec_Receive(t *testing.T) {
 
 				transport := test.NewTransport(_options...)
 				instance := test.NewTransportInstance(transport, _options...)
+				require.NoError(t, instance.Connect())
 				instance.FillReadBuffer([]byte("0531AC0079042F0401430316000011\r\n"))
 				codec := NewMessageCodec(instance, _options...)
 				t.Cleanup(func() {
@@ -594,6 +603,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 
 		transport := test.NewTransport(_options...)
 		transportInstance := test.NewTransportInstance(transport, _options...)
+		require.NoError(t, transportInstance.Connect())
 		codec := NewMessageCodec(transportInstance, _options...)
 		t.Cleanup(func() {
 			assert.Error(t, codec.Disconnect())
@@ -630,6 +640,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 
 		transport := test.NewTransport(_options...)
 		transportInstance := test.NewTransportInstance(transport, _options...)
+		require.NoError(t, transportInstance.Connect())
 		codec := NewMessageCodec(transportInstance, _options...)
 		t.Cleanup(func() {
 			assert.Error(t, codec.Disconnect())
@@ -669,6 +680,7 @@ func TestMessageCodec_Receive_Delayed_Response(t *testing.T) {
 
 		transport := test.NewTransport(_options...)
 		transportInstance := test.NewTransportInstance(transport, _options...)
+		require.NoError(t, transportInstance.Connect())
 		codec := NewMessageCodec(transportInstance, _options...)
 		t.Cleanup(func() {
 			assert.Error(t, codec.Disconnect())
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 4a4c639b80..80506ca3ec 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -98,7 +98,7 @@ func TestReader_Read(t *testing.T) {
 			},
 			wantAsserter: func(t *testing.T, results <-chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -115,6 +115,7 @@ func TestReader_Read(t *testing.T) {
 				alphaGenerator: tt.fields.alphaGenerator,
 				messageCodec:   tt.fields.messageCodec,
 				tm:             tt.fields.tm,
+				log:            testutils.ProduceTestingLogger(t),
 			}
 			assert.Truef(t, tt.wantAsserter(t, m.Read(tt.args.ctx, tt.args.readRequest)), "Read(%v, %v)", tt.args.ctx, tt.args.readRequest)
 		})
@@ -150,7 +151,7 @@ func TestReader_readSync(t *testing.T) {
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -196,7 +197,7 @@ func TestReader_readSync(t *testing.T) {
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -220,7 +221,7 @@ func TestReader_readSync(t *testing.T) {
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -290,7 +291,7 @@ func TestReader_readSync(t *testing.T) {
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -350,7 +351,7 @@ func TestReader_readSync(t *testing.T) {
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
-				defer timer.Stop()
+				defer utils.CleanupTimer(timer)
 				select {
 				case <-timer.C:
 					t.Fail()
@@ -370,6 +371,7 @@ func TestReader_readSync(t *testing.T) {
 				alphaGenerator: tt.fields.alphaGenerator,
 				messageCodec:   tt.fields.messageCodec,
 				tm:             tt.fields.tm,
+				log:            testutils.ProduceTestingLogger(t),
 			}
 			m.readSync(tt.args.ctx, tt.args.readRequest, tt.args.result)
 			assert.True(t, tt.resultEvaluator(t, tt.args.result))


[plc4x] 04/05: test(plc4go/cbus): fix for nasty test which only runs on trace now...

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 725471b2acb692d5886f84b249673283d18a12be
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 21:35:03 2023 +0200

    test(plc4go/cbus): fix for nasty test which only runs on trace now...
    
    + should be fixed asap
---
 plc4go/internal/cbus/Discoverer_test.go | 1 +
 1 file changed, 1 insertion(+)

diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 5768787c5c..b96f45ec6f 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -155,6 +155,7 @@ func TestDiscoverer_Discover(t *testing.T) {
 }
 
 func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
+	config.TraceDefaultMessageCodecWorker = true
 	type fields struct {
 		transportInstanceCreationQueue pool.Executor
 		deviceScanningQueue            pool.Executor


[plc4x] 03/05: fix(plc4go/spi): avoid shutting down the shared executor

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 0784d3786f066b61c4a637f2d0063faf017971b6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 21:16:26 2023 +0200

    fix(plc4go/spi): avoid shutting down the shared executor
---
 plc4go/spi/transactions/RequestTransactionManager.go | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index e1ffd98ed2..786541d37d 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -41,6 +41,9 @@ var sharedExecutorInstance pool.Executor // shared instance
 func init() {
 	sharedExecutorInstance = pool.NewFixedSizeExecutor(runtime.NumCPU(), 100, pool.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
 	sharedExecutorInstance.Start()
+	runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
+		sharedExecutorInstance.Stop()
+	})
 }
 
 type RequestTransactionRunnable func(RequestTransaction)
@@ -258,8 +261,12 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	r.runningRequestMutex.Lock()
 	defer r.runningRequestMutex.Unlock()
 	r.runningRequests = nil
-	if err := r.executor.Close(); err != nil {
-		return errors.Wrap(err, "error closing executor")
+	if r.executor != sharedExecutorInstance {
+		if err := r.executor.Close(); err != nil {
+			return errors.Wrap(err, "error closing executor")
+		}
+	} else {
+		r.log.Warn().Msg("not closing shared instance")
 	}
 	return nil
 }


[plc4x] 01/05: fix(plc4go/spi): test transport instance panics if worked with on disconnected state

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 3c7acbdfa5f310cb1119f7fc029e5aa2ed5744b6
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 20:33:44 2023 +0200

    fix(plc4go/spi): test transport instance panics if worked with on disconnected state
---
 plc4go/internal/cbus/Connection_test.go            |  6 +++++
 plc4go/spi/transports/test/TransportInstance.go    | 24 ++++++++++++++++++
 .../spi/transports/test/TransportInstance_test.go  | 29 ++++++++++++++++++++--
 3 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 42671b3a4b..d0924f7674 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -900,6 +900,7 @@ func TestConnection_sendCalDataWrite(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
@@ -973,6 +974,7 @@ func TestConnection_sendReset(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
@@ -1046,6 +1048,7 @@ func TestConnection_setApplicationFilter(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
@@ -1119,6 +1122,7 @@ func TestConnection_setInterface1PowerUpSettings(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
@@ -1192,6 +1196,7 @@ func TestConnection_setInterfaceOptions1(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
@@ -1265,6 +1270,7 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
diff --git a/plc4go/spi/transports/test/TransportInstance.go b/plc4go/spi/transports/test/TransportInstance.go
index 8a17d4ec8e..972a5f4b8c 100644
--- a/plc4go/spi/transports/test/TransportInstance.go
+++ b/plc4go/spi/transports/test/TransportInstance.go
@@ -88,6 +88,9 @@ func (m *TransportInstance) IsConnected() bool {
 }
 
 func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.RLock()
 	defer m.dataMutex.RUnlock()
 	readableBytes := len(m.readBuffer)
@@ -96,6 +99,9 @@ func (m *TransportInstance) GetNumBytesAvailableInBuffer() (uint32, error) {
 }
 
 func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, reader *bufio.Reader) bool) error {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.log.Trace().Msg("Fill the buffer")
 	nBytes := uint32(1)
 	for {
@@ -117,6 +123,9 @@ func (m *TransportInstance) FillBuffer(until func(pos uint, currentByte byte, re
 }
 
 func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.RLock()
 	defer m.dataMutex.RUnlock()
 	availableBytes := uint32(math.Min(float64(numBytes), float64(len(m.readBuffer))))
@@ -133,6 +142,9 @@ func (m *TransportInstance) PeekReadableBytes(numBytes uint32) ([]byte, error) {
 }
 
 func (m *TransportInstance) Read(numBytes uint32) ([]byte, error) {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
 	nBytes := len(m.readBuffer)
@@ -151,6 +163,9 @@ func (m *TransportInstance) SetWriteInterceptor(writeInterceptor func(transportI
 }
 
 func (m *TransportInstance) Write(data []byte) error {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	if m.writeInterceptor != nil {
 		m.log.Trace().Msgf("Passing data to write interceptor\n%s", hex.Dump(data))
 		m.writeInterceptor(m, data)
@@ -163,6 +178,9 @@ func (m *TransportInstance) Write(data []byte) error {
 }
 
 func (m *TransportInstance) FillReadBuffer(data []byte) {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
 	m.log.Trace().Msgf("fill read buffer with \n%s (%d bytes). (Adding to %d bytes existing)", hex.Dump(data), len(data), len(m.readBuffer))
@@ -170,6 +188,9 @@ func (m *TransportInstance) FillReadBuffer(data []byte) {
 }
 
 func (m *TransportInstance) GetNumDrainableBytes() uint32 {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.RLock()
 	defer m.dataMutex.RUnlock()
 	m.log.Trace().Msg("get number of drainable bytes")
@@ -177,6 +198,9 @@ func (m *TransportInstance) GetNumDrainableBytes() uint32 {
 }
 
 func (m *TransportInstance) DrainWriteBuffer(numBytes uint32) []byte {
+	if !m.IsConnected() {
+		panic(errors.New("working on a unconnected connection"))
+	}
 	m.dataMutex.Lock()
 	defer m.dataMutex.Unlock()
 	m.log.Trace().Msgf("Drain write buffer with number of bytes %d", numBytes)
diff --git a/plc4go/spi/transports/test/TransportInstance_test.go b/plc4go/spi/transports/test/TransportInstance_test.go
index dcb2069032..36fdb95e97 100644
--- a/plc4go/spi/transports/test/TransportInstance_test.go
+++ b/plc4go/spi/transports/test/TransportInstance_test.go
@@ -175,6 +175,9 @@ func TestTransportInstance_DrainWriteBuffer(t *testing.T) {
 	}{
 		{
 			name: "drain it",
+			fields: fields{
+				connected: true,
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -212,6 +215,9 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 	}{
 		{
 			name: "fill it (errors)",
+			fields: fields{
+				connected: true,
+			},
 			args: args{
 				until: func(pos uint, currentByte byte, reader *bufio.Reader) bool {
 					return pos < 3
@@ -222,6 +228,7 @@ func TestTransportInstance_FillBuffer(t *testing.T) {
 		{
 			name: "fill it",
 			fields: fields{
+				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
@@ -266,6 +273,7 @@ func TestTransportInstance_FillReadBuffer(t *testing.T) {
 		{
 			name: "fill it",
 			fields: fields{
+				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			args: args{
@@ -303,10 +311,14 @@ func TestTransportInstance_GetNumBytesAvailableInBuffer(t *testing.T) {
 	}{
 		{
 			name: "get it",
+			fields: fields{
+				connected: true,
+			},
 		},
 		{
-			name: "get it",
+			name: "get it too",
 			fields: fields{
+				connected:  true,
 				readBuffer: []byte{1, 2, 3, 4},
 			},
 			want: 4,
@@ -348,6 +360,9 @@ func TestTransportInstance_GetNumDrainableBytes(t *testing.T) {
 	}{
 		{
 			name: "get it",
+			fields: fields{
+				connected: true,
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -419,6 +434,9 @@ func TestTransportInstance_PeekReadableBytes(t *testing.T) {
 	}{
 		{
 			name: "peek it",
+			fields: fields{
+				connected: true,
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -461,7 +479,10 @@ func TestTransportInstance_Read(t *testing.T) {
 		wantErr bool
 	}{
 		{
-			name:    "read it",
+			name: "read it",
+			fields: fields{
+				connected: true,
+			},
 			wantErr: true,
 		},
 	}
@@ -573,10 +594,14 @@ func TestTransportInstance_Write(t *testing.T) {
 	}{
 		{
 			name: "write it",
+			fields: fields{
+				connected: true,
+			},
 		},
 		{
 			name: "write it",
 			fields: fields{
+				connected: true,
 				writeInterceptor: func(transportInstance *TransportInstance, data []byte) {
 					assert.NotNil(t, transportInstance)
 					assert.Equal(t, []byte{1, 2, 3, 4}, data)


[plc4x] 02/05: fix(plc4go/spi): re-order disconnection on Defaukt codec

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 3456bcaa543f941c33f27a6e0b01d67a6c3be3e2
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 20:34:05 2023 +0200

    fix(plc4go/spi): re-order disconnection on Defaukt codec
---
 plc4go/spi/default/DefaultCodec.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 78c3742f50..1c616c4885 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -160,13 +160,13 @@ func (m *defaultCodec) Disconnect() error {
 	}
 	m.log.Trace().Msg("Disconnecting")
 	m.running.Store(false)
+	m.log.Trace().Msg("Waiting for worker to shutdown")
+	m.activeWorker.Wait()
 	if m.transportInstance != nil {
 		if err := m.transportInstance.Close(); err != nil {
 			return errors.Wrap(err, "error closing transport instance")
 		}
 	}
-	m.log.Trace().Msg("Waiting for worker to shutdown")
-	m.activeWorker.Wait()
 	m.log.Trace().Msg("disconnected")
 	return nil
 }