You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 21:35:42 UTC

[plc4x] branch develop updated (c02f8f51de -> 5eca78479c)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from c02f8f51de chore(plc4go): re-order time declarations
     new ac957705d6 refactor(plc4go/spi): move pool option to other options
     new 5eca78479c feat(plc4go): add receive timeout option

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 42 ++++++++++-----------
 plc4go/internal/cbus/Discoverer_test.go            |  3 +-
 plc4go/pkg/api/config/config.go                    |  9 ++++-
 plc4go/spi/default/DefaultCodec.go                 | 23 +++++++++++-
 plc4go/spi/default/DefaultCodec_test.go            | 18 +++++++--
 plc4go/spi/options/Option.go                       | 43 ++++++++++++++++++++++
 plc4go/spi/pool/WorkerPool.go                      | 37 +------------------
 plc4go/spi/pool/WorkerPool_test.go                 | 28 ++------------
 plc4go/spi/testutils/TestUtils.go                  |  5 +--
 .../spi/transactions/RequestTransactionManager.go  |  8 +++-
 10 files changed, 123 insertions(+), 93 deletions(-)


[plc4x] 02/02: feat(plc4go): add receive timeout option

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5eca78479cd542acbf0478f87606adf47798e6ea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:35:34 2023 +0200

    feat(plc4go): add receive timeout option
---
 plc4go/internal/cbus/Discoverer_test.go |  3 +--
 plc4go/pkg/api/config/config.go         |  6 ++++++
 plc4go/spi/default/DefaultCodec.go      | 23 ++++++++++++++++++++++-
 plc4go/spi/default/DefaultCodec_test.go | 18 +++++++++++++++---
 plc4go/spi/options/Option.go            | 22 ++++++++++++++++++++++
 5 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 544ac2d843..9e8270929e 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -29,7 +29,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/apache/plc4x/plc4go/pkg/api/config"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
@@ -156,7 +155,6 @@ func TestDiscoverer_Discover(t *testing.T) {
 }
 
 func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
-	config.TraceDefaultMessageCodecWorker = true
 	type fields struct {
 		transportInstanceCreationQueue pool.Executor
 		deviceScanningQueue            pool.Executor
@@ -200,6 +198,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 					t.Logf("%d bytes written", write)
 				}()
 				t.Cleanup(func() {
+					t.Log("close listener")
 					if err := listen.Close(); err != nil {
 						t.Error(err)
 					}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index 6e06449e3b..c731f181db 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -22,6 +22,7 @@ package config
 import (
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/rs/zerolog"
+	"time"
 )
 
 // TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
@@ -49,6 +50,11 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
 	return options.WithPassLoggerToModel(passLogger)
 }
 
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+	return options.WithReceiveTimeout(timeout)
+}
+
 // WithTraceTransactionManagerWorkers enables trace transaction manager workers
 func WithTraceTransactionManagerWorkers(traceWorkers bool) WithOption {
 	return options.WithTraceTransactionManagerWorkers(traceWorkers)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 9b6efddbff..21e202d48e 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -87,6 +87,7 @@ type defaultCodec struct {
 	stateChange             sync.Mutex
 	activeWorker            sync.WaitGroup
 
+	receiveTimeout                 time.Duration
 	traceDefaultMessageCodecWorker bool
 
 	log zerolog.Logger `ignore:"true"`
@@ -108,6 +109,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 		defaultIncomingMessageChannel:  make(chan spi.Message, 100),
 		expectations:                   []spi.Expectation{},
 		customMessageHandling:          customMessageHandler,
+		receiveTimeout:                 options.ExtractReceiveTimeout(_options...),
 		traceDefaultMessageCodecWorker: options.ExtractTraceDefaultMessageCodecWorker(_options...) || config.TraceDefaultMessageCodecWorker,
 		log:                            options.ExtractCustomLogger(_options...),
 	}
@@ -164,6 +166,7 @@ func (m *defaultCodec) Disconnect() error {
 	m.running.Store(false)
 	m.log.Trace().Msg("Waiting for worker to shutdown")
 	m.activeWorker.Wait()
+	m.log.Trace().Msg("worker shut down")
 	if m.transportInstance != nil {
 		if err := m.transportInstance.Close(); err != nil {
 			return errors.Wrap(err, "error closing transport instance")
@@ -318,7 +321,25 @@ mainLoop:
 
 		workerLog.Trace().Msg("Receiving message")
 		// Check for incoming messages.
-		message, err := m.Receive()
+		var message spi.Message
+		var err error
+		{
+			syncer := make(chan struct{})
+			go func() {
+				message, err = m.Receive()
+				close(syncer)
+			}()
+			timeoutTimer := time.NewTimer(m.receiveTimeout)
+			select {
+			case <-syncer:
+				utils.CleanupTimer(timeoutTimer)
+			case <-timeoutTimer.C:
+				utils.CleanupTimer(timeoutTimer)
+				workerLog.Error().Msgf("receive timeout after %s", m.receiveTimeout)
+				continue mainLoop
+			}
+
+		}
 		if err != nil {
 			workerLog.Error().Err(err).Msg("got an error reading from transport")
 			time.Sleep(10 * time.Millisecond)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 19bde057ac..47ee666559 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -249,7 +249,8 @@ func TestNewDefaultCodec(t *testing.T) {
 		{
 			name: "create it",
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 	}
@@ -298,7 +299,8 @@ func Test_buildDefaultCodec(t *testing.T) {
 		{
 			name: "build it",
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 		{
@@ -309,7 +311,8 @@ func Test_buildDefaultCodec(t *testing.T) {
 				},
 			},
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 	}
@@ -897,6 +900,10 @@ func Test_defaultCodec_Work(t *testing.T) {
 				codec.running.Store(true)
 				codec.activeWorker.Add(1)
 			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				requirements := NewMockDefaultCodecRequirements(t)
+				fields.DefaultCodecRequirements = requirements
+			},
 		},
 		{
 			name: "work hard (panics everywhere)",
@@ -934,6 +941,11 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				requirements := NewMockDefaultCodecRequirements(t)
+				requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
+				fields.DefaultCodecRequirements = requirements
+			},
 			manipulator: func(t *testing.T, codec *defaultCodec) {
 				codec.running.Store(true)
 				codec.activeWorker.Add(1)
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 3d5907b88c..e3dbc9fc48 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -22,6 +22,7 @@ package options
 import (
 	"context"
 	"github.com/rs/zerolog"
+	"time"
 )
 
 // WithOption is a marker interface for options supplied by the builders like WithDefaultTtl
@@ -59,6 +60,22 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
 	return withPassLoggerToModel{passLogger: passLogger}
 }
 
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+	return withReceiveTimeout{timeout: timeout}
+}
+
+// ExtractReceiveTimeout to extract the receive-timeout for reading operations. Defaults to 10 seconds
+func ExtractReceiveTimeout(options ...WithOption) time.Duration {
+	for _, option := range options {
+		switch option := option.(type) {
+		case withReceiveTimeout:
+			return option.timeout
+		}
+	}
+	return 10 * time.Second
+}
+
 // ExtractPassLoggerToModel to extract the flag indicating that model should be passed to Model
 func ExtractPassLoggerToModel(options ...WithOption) bool {
 	for _, option := range options {
@@ -167,6 +184,11 @@ type withPassLoggerToModel struct {
 	passLogger bool
 }
 
+type withReceiveTimeout struct {
+	Option
+	timeout time.Duration
+}
+
 type withTraceTransactionManagerWorkers struct {
 	Option
 	traceWorkers bool


[plc4x] 01/02: refactor(plc4go/spi): move pool option to other options

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ac957705d69a694b5f8b3670cfa7e5e798df6cc9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:18:28 2023 +0200

    refactor(plc4go/spi): move pool option to other options
---
 plc4go/internal/cbus/CBusMessageMapper_test.go     | 42 +++++++++++-----------
 plc4go/pkg/api/config/config.go                    |  3 +-
 plc4go/spi/options/Option.go                       | 21 +++++++++++
 plc4go/spi/pool/WorkerPool.go                      | 37 ++-----------------
 plc4go/spi/pool/WorkerPool_test.go                 | 28 ++-------------
 plc4go/spi/testutils/TestUtils.go                  |  5 ++-
 .../spi/transactions/RequestTransactionManager.go  |  8 ++++-
 7 files changed, 57 insertions(+), 87 deletions(-)

diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 3dbdf0f1ea..aa9a8a90a0 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1544,7 +1544,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1619,7 +1619,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1683,7 +1683,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1737,7 +1737,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1794,7 +1794,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1871,7 +1871,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -1966,7 +1966,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2024,7 +2024,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2078,7 +2078,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2132,7 +2132,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2186,7 +2186,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2240,7 +2240,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2314,7 +2314,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2368,7 +2368,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2422,7 +2422,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2476,7 +2476,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2530,7 +2530,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2584,7 +2584,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2654,7 +2654,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2708,7 +2708,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
@@ -2762,7 +2762,7 @@ func TestMapEncodedReply(t *testing.T) {
 					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
 					options.WithPassLoggerToModel(true),
 					transactions.WithCustomExecutor(
-						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+						pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
 					),
 				)
 				t.Cleanup(func() {
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index ba08e54ee3..6e06449e3b 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -21,7 +21,6 @@ package config
 
 import (
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/rs/zerolog"
 )
 
@@ -67,7 +66,7 @@ func WithTraceDefaultMessageCodecWorker(traceWorkers bool) WithOption {
 
 // WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
 func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
-	return pool.WithExecutorOptionTracerWorkers(traceWorkers)
+	return options.WithExecutorOptionTracerWorkers(traceWorkers)
 }
 
 // WithOption is a marker interface for options
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 7ebe9641ae..3d5907b88c 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -118,6 +118,22 @@ func ExtractTraceDefaultMessageCodecWorker(options ...WithOption) bool {
 	return false
 }
 
+// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
+func WithExecutorOptionTracerWorkers(traceWorkers bool) WithOption {
+	return &withTracerExecutorWorkersOption{traceWorkers: traceWorkers}
+}
+
+// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
+func ExtractTracerWorkers(_options ...WithOption) (traceWorkers bool, found bool) {
+	for _, option := range _options {
+		switch option := option.(type) {
+		case *withTracerExecutorWorkersOption:
+			return option.traceWorkers, true
+		}
+	}
+	return false, false
+}
+
 // GetLoggerContextForModel returns a log context if the WithPassLoggerToModel WithOption is set
 func GetLoggerContextForModel(ctx context.Context, log zerolog.Logger, options ...WithOption) context.Context {
 	passToModel := false
@@ -166,6 +182,11 @@ type withTraceDefaultMessageCodecWorker struct {
 	traceWorkers bool
 }
 
+type withTracerExecutorWorkersOption struct {
+	Option
+	traceWorkers bool
+}
+
 //
 //
 ///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 36dd72f494..c9e21b6581 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -60,7 +60,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 		worker:     workers,
 		log:        customLogger,
 	}
-	_executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
+	_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
 	for i := 0; i < numberOfWorkers; i++ {
 		workers[i].executor = _executor
 	}
@@ -77,7 +77,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
 		},
 		maxNumberOfWorkers: maxNumberOfWorkers,
 	}
-	_executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
+	_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
 	// We spawn one initial worker
 	w := worker{
 		id:          0,
@@ -89,36 +89,3 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
 	_executor.worker = append(_executor.worker, &w)
 	return _executor
 }
-
-// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
-func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
-	return &tracerWorkersOption{traceWorkers: traceWorkers}
-}
-
-// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
-func ExtractTracerWorkers(_options ...options.WithOption) (traceWorkers bool, found bool) {
-	for _, option := range _options {
-		switch option := option.(type) {
-		case *tracerWorkersOption:
-			return option.traceWorkers, true
-		}
-	}
-	return false, false
-}
-
-///////////////////////////////////////
-///////////////////////////////////////
-//
-// Internal section
-//
-
-type tracerWorkersOption struct {
-	options.Option
-	traceWorkers bool
-}
-
-//
-// Internal section
-//
-///////////////////////////////////////
-///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index fc67a926ac..5e67092467 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -46,7 +46,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
 			},
 			setup: func(t *testing.T, args *args) {
 				args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -86,7 +86,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
 			},
 			setup: func(t *testing.T, args *args) {
 				args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -104,7 +104,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 2,
 				queueDepth:      2,
-				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
 			},
 			setup: func(t *testing.T, args *args) {
 				args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -175,25 +175,3 @@ func TestNewDynamicExecutor(t *testing.T) {
 		})
 	}
 }
-
-func TestWithExecutorOptionTracerWorkers(t *testing.T) {
-	type args struct {
-		traceWorkers bool
-	}
-	tests := []struct {
-		name              string
-		args              args
-		executorValidator options.WithOption
-	}{
-		{
-			name:              "option should set option",
-			args:              args{traceWorkers: true},
-			executorValidator: &tracerWorkersOption{traceWorkers: true},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			assert.Equal(t, tt.executorValidator, WithExecutorOptionTracerWorkers(tt.args.traceWorkers))
-		})
-	}
-}
diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index e3937a7ff3..66c1b6f907 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -22,7 +22,6 @@ package testutils
 import (
 	"context"
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"os"
 	"runtime/debug"
@@ -155,14 +154,14 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
 // EnrichOptionsWithOptionsForTesting appends options useful for testing to config.WithOption s
 func EnrichOptionsWithOptionsForTesting(t *testing.T, _options ...options.WithOption) []options.WithOption {
 	traceWorkers := true
-	if extractedTraceWorkers, found := pool.ExtractTracerWorkers(_options...); found {
+	if extractedTraceWorkers, found := options.ExtractTracerWorkers(_options...); found {
 		traceWorkers = extractedTraceWorkers
 	}
 	// TODO: apply to other options like above
 	return append(_options,
 		options.WithCustomLogger(ProduceTestingLogger(t)),
 		options.WithPassLoggerToModel(true),
-		pool.WithExecutorOptionTracerWorkers(traceWorkers),
+		options.WithExecutorOptionTracerWorkers(traceWorkers),
 	)
 }
 
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index ffbc4710f2..55cef95f96 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -24,6 +24,7 @@ import (
 	"context"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
+	"github.com/rs/zerolog/log"
 	"io"
 	"runtime"
 	"sync"
@@ -39,7 +40,12 @@ import (
 var sharedExecutorInstance pool.Executor // shared instance
 
 func init() {
-	sharedExecutorInstance = pool.NewFixedSizeExecutor(runtime.NumCPU(), 100, pool.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
+	sharedExecutorInstance = pool.NewFixedSizeExecutor(
+		runtime.NumCPU(),
+		100,
+		options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
+		config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
+	)
 	sharedExecutorInstance.Start()
 	runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
 		sharedExecutorInstance.Stop()