You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 11:02:13 UTC

[plc4x] 04/05: test(plc4go/cbus): fixed possible concurrency issues in test

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7727bc814ba86c61c02f30c877c7651589b5f87d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 13:01:32 2023 +0200

    test(plc4go/cbus): fixed possible concurrency issues in test
---
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 152 +++++++++------------
 plc4go/internal/cbus/Discoverer_test.go            |  14 +-
 plc4go/internal/cbus/Driver_test.go                |  35 +++--
 plc4go/internal/cbus/Reader.go                     |   1 +
 plc4go/internal/cbus/Reader_test.go                |  70 +++++++---
 .../spi/transactions/RequestTransactionManager.go  |   1 +
 6 files changed, 153 insertions(+), 120 deletions(-)

diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index aa9a8a90a0..1bacc4775b 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1468,11 +1468,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1539,13 +1540,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1614,13 +1614,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1678,13 +1677,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1732,13 +1730,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1789,13 +1786,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1866,13 +1862,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1961,13 +1956,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2019,13 +2013,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2073,13 +2066,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2127,13 +2119,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2181,13 +2172,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2235,13 +2225,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2309,13 +2298,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2363,13 +2351,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2417,13 +2404,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2471,13 +2457,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2525,13 +2510,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2579,13 +2563,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2649,13 +2632,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2703,13 +2685,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2757,13 +2738,12 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
+				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					options.WithPassLoggerToModel(true),
-					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
-					),
+					transactions.WithCustomExecutor(executor),
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 9e8270929e..5ac2519d9f 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -94,8 +94,11 @@ func TestDiscoverer_Discover(t *testing.T) {
 				})
 				args.ctx = ctx
 
-				fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
-				fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				fields.transportInstanceCreationQueue = executor
+				fields.deviceScanningQueue = executor
 				return nil
 			},
 			wantErr: assert.NoError,
@@ -118,8 +121,11 @@ func TestDiscoverer_Discover(t *testing.T) {
 					time.Sleep(1 * time.Second)
 				})
 				args.ctx = ctx
-				fields.transportInstanceCreationQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
-				fields.deviceScanningQueue = pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor := pool.NewFixedSizeExecutor(50, 100, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				fields.transportInstanceCreationQueue = executor
+				fields.deviceScanningQueue = executor
 				oldaddressProviderRetriever := addressProviderRetriever
 				addressProviderRetriever = func(log zerolog.Logger, _ []string) ([]addressProvider, error) {
 					loopbackInterface, err := nettest.LoopbackInterface()
diff --git a/plc4go/internal/cbus/Driver_test.go b/plc4go/internal/cbus/Driver_test.go
index ae01bb6f69..5d92be01f8 100644
--- a/plc4go/internal/cbus/Driver_test.go
+++ b/plc4go/internal/cbus/Driver_test.go
@@ -22,6 +22,10 @@ package cbus
 import (
 	"context"
 	"fmt"
+	"net/url"
+	"testing"
+	"time"
+
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
@@ -31,11 +35,9 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
-	"net/url"
-	"testing"
-	"time"
 )
 
 func TestDriver_DiscoverWithContext(t *testing.T) {
@@ -111,6 +113,7 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 		name         string
 		fields       fields
 		args         args
+		setup        func(t *testing.T, fields *fields, args *args)
 		wantVerifier func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool
 	}{
 		{
@@ -153,13 +156,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
+				transports: map[string]transports.Transport{},
 				options: map[string][]string{
 					"failTestTransport": {"yesSir"},
 				},
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
 				defer utils.CleanupTimer(timeout)
@@ -186,13 +190,14 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
+				transports: map[string]transports.Transport{},
 				options: map[string][]string{
 					"MonitoredApplication1": {"pineapple"},
 				},
 			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
+			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
 				defer utils.CleanupTimer(timeout)
@@ -218,10 +223,11 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 				transportUrl: url.URL{
 					Scheme: "test",
 				},
-				transports: map[string]transports.Transport{
-					"test": test.NewTransport(),
-				},
-				options: map[string][]string{},
+				transports: map[string]transports.Transport{},
+				options:    map[string][]string{},
+			},
+			setup: func(t *testing.T, fields *fields, args *args) {
+				args.transports["test"] = test.NewTransport(testutils.EnrichOptionsWithOptionsForTesting(t)...)
 			},
 			wantVerifier: func(t *testing.T, results <-chan plc4go.PlcConnectionConnectResult) bool {
 				timeout := time.NewTimer(20 * time.Millisecond)
@@ -240,6 +246,9 @@ func TestDriver_GetConnectionWithContext(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields, &tt.args)
+			}
 			m := &Driver{
 				DefaultDriver:           tt.fields.DefaultDriver,
 				awaitSetupComplete:      tt.fields.awaitSetupComplete,
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 70aa83016d..59fa3460e6 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -144,6 +144,7 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 		ttl = -time.Since(deadline)
 		m.log.Debug().Msgf("setting ttl to %s", ttl)
 	}
+	m.log.Trace().Msgf("sending with ctx %s", ctx)
 	if err := m.messageCodec.SendRequest(
 		ctx,
 		messageToSend,
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index 80506ca3ec..faf5822937 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -21,25 +21,29 @@ package cbus
 
 import (
 	"context"
+	"encoding/hex"
+	"net/url"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	spiModel "github.com/apache/plc4x/plc4go/spi/model"
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
 	"github.com/apache/plc4x/plc4go/spi/utils"
+
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
-	"net/url"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
 )
 
 func TestNewReader(t *testing.T) {
@@ -180,6 +184,18 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
+				executor := pool.NewFixedSizeExecutor(5, 5)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
+				t.Cleanup(func() {
+					assert.NoError(t, transactionManager.Close())
+				})
+				fields.tm = transactionManager
+
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
@@ -189,11 +205,6 @@ func TestReader_readSync(t *testing.T) {
 				t.Cleanup(func() {
 					assert.NoError(t, codec.Disconnect())
 				})
-				fields.messageCodec = codec
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
-				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
-				})
 			},
 			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
@@ -254,11 +265,19 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+				executor := pool.NewFixedSizeExecutor(5, 5, _options...)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
 				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
+					assert.NoError(t, transactionManager.Close())
 				})
-				transport := test.NewTransport()
+				fields.tm = transactionManager
+
+				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
 				require.NoError(t, err)
@@ -271,6 +290,7 @@ func TestReader_readSync(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -333,12 +353,19 @@ func TestReader_readSync(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				fields.tm = transactions.NewRequestTransactionManager(10, _options...)
+				executor := pool.NewFixedSizeExecutor(5, 5)
+				executor.Start()
+				t.Cleanup(executor.Stop)
+				transactionManager := transactions.NewRequestTransactionManager(
+					10,
+					append(_options, transactions.WithCustomExecutor(executor))...,
+				)
 				t.Cleanup(func() {
-					assert.NoError(t, fields.tm.Close())
+					assert.NoError(t, transactionManager.Close())
 				})
+				fields.tm = transactionManager
 
-				transport := test.NewTransport()
+				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
 				require.NoError(t, err)
@@ -374,6 +401,7 @@ func TestReader_readSync(t *testing.T) {
 				log:            testutils.ProduceTestingLogger(t),
 			}
 			m.readSync(tt.args.ctx, tt.args.readRequest, tt.args.result)
+			t.Log("done read sync")
 			assert.True(t, tt.resultEvaluator(t, tt.args.result))
 		})
 	}
@@ -502,6 +530,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -574,6 +603,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -662,6 +692,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -743,6 +774,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -824,6 +856,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -905,6 +938,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -986,6 +1020,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -1067,6 +1102,7 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				currentState.Store(INITIAL)
 				stateChangeMutex := sync.Mutex{}
 				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 4d8105b7f3..83551e0805 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -274,5 +274,6 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	} else {
 		r.log.Warn().Msg("not closing shared instance")
 	}
+	r.log.Debug().Msg("closed")
 	return nil
 }