You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/13 12:17:21 UTC

[plc4x] branch develop updated (ff1810481f -> 5422176096)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from ff1810481f build(deps): bump asciidoctorj-diagram from 2.2.8 to 2.2.9 (#986)
     new d7d54912ee refactor(plc4go/spi): split up request transaction into separate file
     new ade510700a refactor(plc4go/spi): split up pool into multiple files
     new caa9718473 feat(plc4go/spi): added more Stringer implementations
     new 5422176096 feat(plc4go/cbus): added more Stringer implementations

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Configuration.go              |   5 +
 plc4go/internal/cbus/Configuration_test.go         |  60 ++
 plc4go/internal/cbus/Connection.go                 |  46 +-
 plc4go/internal/cbus/Connection_test.go            |  11 +-
 plc4go/internal/cbus/DriverContext.go              |   6 +
 plc4go/internal/cbus/MessageCodec.go               |  21 +
 plc4go/internal/cbus/MessageCodec_test.go          |  49 ++
 plc4go/internal/cbus/Subscriber.go                 |  76 +--
 plc4go/internal/cbus/Subscriber_test.go            |  96 ++-
 plc4go/spi/default/DefaultCodec.go                 |  17 +
 plc4go/spi/default/DefaultCodec_test.go            |  43 ++
 plc4go/spi/default/DefaultConnection.go            |  15 +
 plc4go/spi/default/DefaultConnection_test.go       |  40 ++
 plc4go/spi/pool/CompletionFuture.go                |  90 +++
 plc4go/spi/pool/CompletionFuture_test.go           | 191 ++++++
 plc4go/spi/pool/WorkerPool.go                      | 393 +-----------
 plc4go/spi/pool/WorkerPool_test.go                 | 689 ---------------------
 plc4go/spi/pool/dynamicExecutor.go                 | 186 ++++++
 plc4go/spi/pool/dynamicExecutor_test.go            | 213 +++++++
 plc4go/spi/pool/executor.go                        | 148 +++++
 plc4go/spi/pool/executor_test.go                   | 479 ++++++++++++++
 .../model/plc_message.go => spi/pool/workItem.go}  |  21 +-
 .../pool/workItem_test.go}                         |  34 +-
 plc4go/spi/pool/worker.go                          | 113 ++++
 plc4go/spi/pool/worker_test.go                     | 253 ++++++++
 plc4go/spi/transactions/RequestTransaction.go      | 148 +++++
 .../spi/transactions/RequestTransactionManager.go  | 140 +----
 .../transactions/RequestTransactionManager_test.go | 401 +++---------
 plc4go/spi/transactions/RequestTransaction_test.go | 319 ++++++++++
 .../{mock_requirements.go => completedFuture.go}   |  21 +-
 plc4go/spi/transactions/completedFuture_test.go    | 106 ++++
 31 files changed, 2812 insertions(+), 1618 deletions(-)
 create mode 100644 plc4go/spi/pool/CompletionFuture.go
 create mode 100644 plc4go/spi/pool/CompletionFuture_test.go
 create mode 100644 plc4go/spi/pool/dynamicExecutor.go
 create mode 100644 plc4go/spi/pool/dynamicExecutor_test.go
 create mode 100644 plc4go/spi/pool/executor.go
 create mode 100644 plc4go/spi/pool/executor_test.go
 copy plc4go/{pkg/api/model/plc_message.go => spi/pool/workItem.go} (71%)
 copy plc4go/{internal/cbus/DriverContext_test.go => spi/pool/workItem_test.go} (63%)
 create mode 100644 plc4go/spi/pool/worker.go
 create mode 100644 plc4go/spi/pool/worker_test.go
 create mode 100644 plc4go/spi/transactions/RequestTransaction.go
 create mode 100644 plc4go/spi/transactions/RequestTransaction_test.go
 copy plc4go/spi/transactions/{mock_requirements.go => completedFuture.go} (72%)
 create mode 100644 plc4go/spi/transactions/completedFuture_test.go


[plc4x] 03/04: feat(plc4go/spi): added more Stringer implementations

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit caa9718473faa3ebb4fc71951fd3cf4ead000c17
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 14:16:57 2023 +0200

    feat(plc4go/spi): added more Stringer implementations
---
 plc4go/spi/default/DefaultCodec.go                 |  17 ++++
 plc4go/spi/default/DefaultCodec_test.go            |  43 ++++++++
 plc4go/spi/default/DefaultConnection.go            |  15 +++
 plc4go/spi/default/DefaultConnection_test.go       |  40 ++++++++
 plc4go/spi/pool/dynamicExecutor.go                 |  13 +++
 plc4go/spi/pool/dynamicExecutor_test.go            |  43 ++++++++
 plc4go/spi/pool/executor.go                        |  19 ++++
 plc4go/spi/pool/executor_test.go                   |  66 ++++++++++++
 plc4go/spi/pool/workItem.go                        |  10 +-
 plc4go/spi/pool/workItem_test.go                   |   6 +-
 plc4go/spi/pool/worker.go                          |  17 ++++
 plc4go/spi/pool/worker_test.go                     |  26 +++++
 .../spi/transactions/RequestTransactionManager.go  |  33 ++++--
 .../transactions/RequestTransactionManager_test.go | 113 +++++++++++++++++----
 plc4go/spi/transactions/completedFuture.go         |   9 +-
 plc4go/spi/transactions/completedFuture_test.go    | 106 +++++++++++++++++++
 16 files changed, 549 insertions(+), 27 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 30b70557b6..afd3052c9c 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -244,6 +244,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
 	messageHandled := false
 	m.log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
 	for index, expectation := range m.expectations {
+		m.log.Trace().Msgf("Checking expectation %s", expectation)
 		// Check if the current message matches the expectations
 		// If it does, let it handle the message.
 		if accepts := expectation.GetAcceptsMessage()(message); accepts {
@@ -354,3 +355,19 @@ func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Log
 		workerLog.Warn().Msgf("Message discarded\n%s", message)
 	}
 }
+
+func (m *defaultCodec) String() string {
+	return fmt.Sprintf("DefaultCodec{\n"+
+		"\tTransportInstance: %s,\n"+
+		"\tDefaultIncomingMessageChannel: %d elements,\n"+
+		"\tExpectations: %s,\n"+
+		"\trunning: %t,\n"+
+		"\tcustomMessageHandling: %t,\n"+
+		"}",
+		m.transportInstance,
+		len(m.defaultIncomingMessageChannel),
+		m.expectations,
+		m.running,
+		m.customMessageHandling != nil,
+	)
+}
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 664b92c986..a186912b71 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"testing"
@@ -1194,3 +1195,45 @@ func Test_defaultCodec_Work(t *testing.T) {
 		})
 	}
 }
+
+func Test_defaultCodec_String(t *testing.T) {
+	type fields struct {
+		DefaultCodecRequirements      DefaultCodecRequirements
+		transportInstance             transports.TransportInstance
+		defaultIncomingMessageChannel chan spi.Message
+		expectations                  []spi.Expectation
+		running                       bool
+		customMessageHandling         func(codec DefaultCodecRequirements, message spi.Message) bool
+		log                           zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			want: "DefaultCodec{\n" +
+				"\tTransportInstance: %!s(<nil>),\n" +
+				"\tDefaultIncomingMessageChannel: 0 elements,\n" +
+				"\tExpectations: [],\n" +
+				"\trunning: false,\n" +
+				"\tcustomMessageHandling: false,\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &defaultCodec{
+				DefaultCodecRequirements:      tt.fields.DefaultCodecRequirements,
+				transportInstance:             tt.fields.transportInstance,
+				defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
+				expectations:                  tt.fields.expectations,
+				running:                       tt.fields.running,
+				customMessageHandling:         tt.fields.customMessageHandling,
+				log:                           tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, m.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 2d3a0f9fa5..67caedf5cd 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -21,6 +21,7 @@ package _default
 
 import (
 	"context"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/rs/zerolog"
 	"runtime/debug"
@@ -359,6 +360,20 @@ func (d *defaultConnection) GetPlcValueHandler() spi.PlcValueHandler {
 	return d.valueHandler
 }
 
+func (d *defaultConnection) String() string {
+	return fmt.Sprintf("DefaultConnection{\n"+
+		"\tttl: %s,\n"+
+		"\tconnected: %t,\n"+
+		"\ttagHandler: %s,\n"+
+		"\tvalueHandler: %s,\n"+
+		"}",
+		d.defaultTtl,
+		d.connected,
+		d.tagHandler,
+		d.valueHandler,
+	)
+}
+
 func (m DefaultConnectionMetadata) GetConnectionAttributes() map[string]string {
 	return m.ConnectionAttributes
 }
diff --git a/plc4go/spi/default/DefaultConnection_test.go b/plc4go/spi/default/DefaultConnection_test.go
index c013b0690e..2e26883a1c 100644
--- a/plc4go/spi/default/DefaultConnection_test.go
+++ b/plc4go/spi/default/DefaultConnection_test.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/tracer"
+	"github.com/rs/zerolog"
 	"testing"
 	"time"
 
@@ -1319,3 +1320,42 @@ func Test_plcConnectionPingResult_GetErr(t *testing.T) {
 		})
 	}
 }
+
+func Test_defaultConnection_String(t *testing.T) {
+	type fields struct {
+		DefaultConnectionRequirements DefaultConnectionRequirements
+		defaultTtl                    time.Duration
+		connected                     bool
+		tagHandler                    spi.PlcTagHandler
+		valueHandler                  spi.PlcValueHandler
+		log                           zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			want: "DefaultConnection{\n" +
+				"\tttl: 0s,\n" +
+				"\tconnected: false,\n" +
+				"\ttagHandler: %!s(<nil>),\n" +
+				"\tvalueHandler: %!s(<nil>),\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			d := &defaultConnection{
+				DefaultConnectionRequirements: tt.fields.DefaultConnectionRequirements,
+				defaultTtl:                    tt.fields.defaultTtl,
+				connected:                     tt.fields.connected,
+				tagHandler:                    tt.fields.tagHandler,
+				valueHandler:                  tt.fields.valueHandler,
+				log:                           tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, d.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 22f8d20e5b..ae4135d60a 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/rs/zerolog"
 	"runtime/debug"
@@ -171,3 +172,15 @@ func (e *dynamicExecutor) Stop() {
 	e.dynamicWorkers.Wait()
 	e.log.Trace().Msg("stopped")
 }
+
+func (e *dynamicExecutor) String() string {
+	return fmt.Sprintf("dynamicExecutor{\n"+
+		"\texecutor: %s\n"+
+		"\tmaxNumberOfWorkers: %d\n"+
+		"\tcurrentNumberOfWorkers: %d\n"+
+		"}",
+		e.executor,
+		e.maxNumberOfWorkers,
+		e.currentNumberOfWorkers.Load(),
+	)
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 1db92a5d7f..190fceb41d 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+	"github.com/stretchr/testify/assert"
 	"testing"
 	"time"
 )
@@ -168,3 +169,45 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
 		})
 	}
 }
+
+func Test_dynamicExecutor_String(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			fields: fields{
+				executor:           &executor{},
+				maxNumberOfWorkers: 3,
+			},
+			want: "dynamicExecutor{\n" +
+				"\texecutor: executor{\n" +
+				"\trunning: false,\n" +
+				"\tshutdown: false,\n" +
+				"\tworker: [],\n" +
+				"\tqueueDepth: 0,\n" +
+				"\tworkItems: 0 elements,\n" +
+				"\ttraceWorkers: false,\n" +
+				"\n" +
+				"}\n" +
+				"\tmaxNumberOfWorkers: 3\n" +
+				"\tcurrentNumberOfWorkers: 0\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+			}
+			assert.Equalf(t, tt.want, e.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 79bddb6dc0..0e8e458903 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -21,6 +21,7 @@ package pool
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"sync/atomic"
 
@@ -127,3 +128,21 @@ func (e *executor) Close() error {
 func (e *executor) IsRunning() bool {
 	return e.running && !e.shutdown
 }
+
+func (e *executor) String() string {
+	return fmt.Sprintf("executor{\n"+
+		"\trunning: %t,\n"+
+		"\tshutdown: %t,\n"+
+		"\tworker: %s,\n"+
+		"\tqueueDepth: %d,\n"+
+		"\tworkItems: %d elements,\n"+
+		"\ttraceWorkers: %t,\n"+
+		"\n}",
+		e.running,
+		e.shutdown,
+		e.worker,
+		e.queueDepth,
+		len(e.workItems),
+		e.traceWorkers,
+	)
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 2a43b1366e..577ce986c0 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -25,6 +25,7 @@ import (
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -411,3 +412,68 @@ func Test_executor_isTraceWorkers(t *testing.T) {
 		})
 	}
 }
+
+func Test_executor_String(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			fields: fields{
+				running:  true,
+				shutdown: true,
+				worker: []*worker{
+					{
+						id:           1,
+						shutdown:     atomic.Bool{},
+						interrupted:  atomic.Bool{},
+						hasEnded:     atomic.Bool{},
+						lastReceived: time.Time{},
+					},
+				},
+				queueDepth:   2,
+				traceWorkers: true,
+			},
+			want: "executor{\n" +
+				"\trunning: true,\n" +
+				"\tshutdown: true,\n" +
+				"\tworker: [worker{\n" +
+				"\tid: 1,\n" +
+				"\tshutdown: false,\n" +
+				"\tinterrupted: false,\n" +
+				"\thasEnded: false,\n" +
+				"\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n" +
+				"}],\n" +
+				"\tqueueDepth: 2,\n" +
+				"\tworkItems: 0 elements,\n" +
+				"\ttraceWorkers: true,\n" +
+				"\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
index 942480bc1a..233df1315d 100644
--- a/plc4go/spi/pool/workItem.go
+++ b/plc4go/spi/pool/workItem.go
@@ -28,5 +28,13 @@ type workItem struct {
 }
 
 func (w workItem) String() string {
-	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+	return fmt.Sprintf("workItem{\n"+
+		"\twid: %d,\n"+
+		"\trunnable: %t,\n"+
+		"\tcompletionFuture: %s,\n"+
+		"}",
+		w.workItemId,
+		w.runnable != nil,
+		w.completionFuture,
+	)
 }
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
index ebc4b30c5c..1477f69012 100644
--- a/plc4go/spi/pool/workItem_test.go
+++ b/plc4go/spi/pool/workItem_test.go
@@ -37,7 +37,11 @@ func Test_workItem_String(t *testing.T) {
 	}{
 		{
 			name: "Simple test",
-			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+			want: "workItem{\n" +
+				"\twid: 0,\n" +
+				"\trunnable: false,\n" +
+				"\tcompletionFuture: <nil>,\n" +
+				"}",
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 7530ed0ad9..77909cd1b6 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+	"fmt"
 	"runtime/debug"
 	"sync"
 	"sync/atomic"
@@ -94,3 +95,19 @@ func (w *worker) work() {
 	w.hasEnded.Store(true)
 	workerLog.Debug().Msg("setting to ended")
 }
+
+func (w *worker) String() string {
+	return fmt.Sprintf("worker{\n"+
+		"\tid: %d,\n"+
+		"\tshutdown: %v,\n"+
+		"\tinterrupted: %t,\n"+
+		"\thasEnded: %t,\n"+
+		"\tlastReceived: %s,\n"+
+		"}",
+		w.id,
+		w.shutdown.Load(),
+		w.interrupted.Load(),
+		w.hasEnded.Load(),
+		w.lastReceived,
+	)
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 4f604d2efb..31c0ece2c0 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -225,3 +225,29 @@ func Test_worker_work(t *testing.T) {
 		})
 	}
 }
+
+func Test_worker_String(t *testing.T) {
+	type fields struct {
+		id           int
+		lastReceived time.Time
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			want: "worker{\n\tid: 0,\n\tshutdown: false,\n\tinterrupted: false,\n\thasEnded: false,\n\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				lastReceived: tt.fields.lastReceived,
+			}
+			assert.Equalf(t, tt.want, w.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 8a93dc6f87..fe63035ebd 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -22,6 +22,7 @@ package transactions
 import (
 	"container/list"
 	"context"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
 	"io"
@@ -60,7 +61,7 @@ type RequestTransactionManager interface {
 func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...options.WithOption) RequestTransactionManager {
 	_requestTransactionManager := &requestTransactionManager{
 		numberOfConcurrentRequests: numberOfConcurrentRequests,
-		transactionId:              0,
+		currentTransactionId:       0,
 		workLog:                    *list.New(),
 		executor:                   sharedExecutorInstance,
 
@@ -98,8 +99,8 @@ type requestTransactionManager struct {
 	// How many transactions are allowed to run at the same time?
 	numberOfConcurrentRequests int
 	// Assigns each request a Unique Transaction Id, especially important for failure handling
-	transactionId    int32
-	transactionMutex sync.RWMutex
+	currentTransactionId int32
+	transactionMutex     sync.RWMutex
 	// Important, this is a FIFO Queue for Fairness!
 	workLog      list.List
 	workLogMutex sync.RWMutex
@@ -162,9 +163,9 @@ func (r *requestTransactionManager) processWorklog() {
 func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 	r.transactionMutex.Lock()
 	defer r.transactionMutex.Unlock()
-	currentTransactionId := r.transactionId
-	r.transactionId += 1
-	transactionLogger := r.log.With().Int32("transactionId", currentTransactionId).Logger()
+	currentTransactionId := r.currentTransactionId
+	r.currentTransactionId += 1
+	transactionLogger := r.log.With().Int32("currentTransactionId", currentTransactionId).Logger()
 	if !r.traceTransactionManagerTransactions {
 		transactionLogger = zerolog.Nop()
 	}
@@ -246,3 +247,23 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	r.runningRequests = nil
 	return r.executor.Close()
 }
+
+func (r *requestTransactionManager) String() string {
+	return fmt.Sprintf("RequestTransactionManager{\n"+
+		"\trunningRequests: %s,\n"+
+		"\tnumberOfConcurrentRequests: %d,\n"+
+		"\tcurrentTransactionId: %d,\n"+
+		"\tworkLog: %d elements,\n"+
+		"\texecutor: %s,\n"+
+		"\tshutdown: %t,\n"+
+		"\ttraceTransactionManagerTransactions: %t,\n"+
+		"}",
+		r.runningRequests,
+		r.numberOfConcurrentRequests,
+		r.currentTransactionId,
+		r.workLog.Len(),
+		r.executor,
+		r.shutdown,
+		r.traceTransactionManagerTransactions,
+	)
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 4445edad1e..4925ce7790 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -106,7 +106,7 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 	}
@@ -135,7 +135,7 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 			}
@@ -148,7 +148,7 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 	type fields struct {
 		runningRequests                     []*requestTransaction
 		numberOfConcurrentRequests          int
-		transactionId                       int32
+		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
 		shutdown                            bool
@@ -194,7 +194,7 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:                     tt.fields.runningRequests,
 				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
-				transactionId:                       tt.fields.transactionId,
+				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
 				shutdown:                            tt.fields.shutdown,
@@ -212,7 +212,7 @@ func Test_requestTransactionManager_endRequest(t *testing.T) {
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 	}
@@ -249,7 +249,7 @@ func Test_requestTransactionManager_endRequest(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 			}
@@ -264,7 +264,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 		log                        zerolog.Logger
@@ -304,7 +304,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 				log:                        tt.fields.log,
@@ -320,7 +320,7 @@ func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 	}
@@ -338,7 +338,7 @@ func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 			}
@@ -353,7 +353,7 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 	}
@@ -401,7 +401,7 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 			}
@@ -414,7 +414,7 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
 	type fields struct {
 		runningRequests            []*requestTransaction
 		numberOfConcurrentRequests int
-		transactionId              int32
+		currentTransactionId       int32
 		workLog                    list.List
 		executor                   pool.Executor
 	}
@@ -442,7 +442,7 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:            tt.fields.runningRequests,
 				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
+				currentTransactionId:       tt.fields.currentTransactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
 			}
@@ -455,7 +455,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 	type fields struct {
 		runningRequests                     []*requestTransaction
 		numberOfConcurrentRequests          int
-		transactionId                       int32
+		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
 		shutdown                            bool
@@ -486,7 +486,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:                     tt.fields.runningRequests,
 				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
-				transactionId:                       tt.fields.transactionId,
+				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
 				shutdown:                            tt.fields.shutdown,
@@ -502,7 +502,7 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 	type fields struct {
 		runningRequests                     []*requestTransaction
 		numberOfConcurrentRequests          int
-		transactionId                       int32
+		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
 		shutdown                            bool
@@ -566,7 +566,7 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 			r := &requestTransactionManager{
 				runningRequests:                     tt.fields.runningRequests,
 				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
-				transactionId:                       tt.fields.transactionId,
+				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
 				shutdown:                            tt.fields.shutdown,
@@ -577,3 +577,80 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 		})
 	}
 }
+
+func Test_requestTransactionManager_String(t *testing.T) {
+	type fields struct {
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		currentTransactionId                int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			fields: fields{
+				runningRequests: []*requestTransaction{
+					{
+						transactionId: 2,
+					},
+				},
+				numberOfConcurrentRequests: 3,
+				currentTransactionId:       4,
+				workLog: func() list.List {
+					v := list.List{}
+					v.PushBack(nil)
+					return v
+				}(),
+				executor:                            pool.NewFixedSizeExecutor(1, 1),
+				shutdown:                            true,
+				traceTransactionManagerTransactions: true,
+			},
+			want: "RequestTransactionManager{\n" +
+				"\trunningRequests: [Transaction{tid:2}],\n" +
+				"\tnumberOfConcurrentRequests: 3,\n" +
+				"\tcurrentTransactionId: 4,\n" +
+				"\tworkLog: 1 elements,\n" +
+				"\texecutor: executor{\n" +
+				"\trunning: false,\n" +
+				"\tshutdown: false,\n" +
+				"\tworker: [worker{\n" +
+				"\tid: 0,\n" +
+				"\tshutdown: false,\n" +
+				"\tinterrupted: false,\n" +
+				"\thasEnded: false,\n" +
+				"\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n" +
+				"}],\n" +
+				"\tqueueDepth: 1,\n" +
+				"\tworkItems: 0 elements,\n" +
+				"\ttraceWorkers: false,\n" +
+				"\n" +
+				"},\n" +
+				"\tshutdown: true,\n" +
+				"\ttraceTransactionManagerTransactions: true,\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			r := &requestTransactionManager{
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				currentTransactionId:                tt.fields.currentTransactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, r.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/transactions/completedFuture.go b/plc4go/spi/transactions/completedFuture.go
index ac8227f579..9d5ef8d9ab 100644
--- a/plc4go/spi/transactions/completedFuture.go
+++ b/plc4go/spi/transactions/completedFuture.go
@@ -19,7 +19,10 @@
 
 package transactions
 
-import "context"
+import (
+	"context"
+	"fmt"
+)
 
 type completedFuture struct {
 	err error
@@ -32,3 +35,7 @@ func (c completedFuture) AwaitCompletion(_ context.Context) error {
 func (completedFuture) Cancel(_ bool, _ error) {
 	// No op
 }
+
+func (c completedFuture) String() string {
+	return fmt.Sprintf("completedFuture{\n\terr: %v,\n}", c.err)
+}
diff --git a/plc4go/spi/transactions/completedFuture_test.go b/plc4go/spi/transactions/completedFuture_test.go
new file mode 100644
index 0000000000..c875decec7
--- /dev/null
+++ b/plc4go/spi/transactions/completedFuture_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+	"context"
+	"fmt"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func Test_completedFuture_AwaitCompletion(t *testing.T) {
+	type fields struct {
+		err error
+	}
+	type args struct {
+		in0 context.Context
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name:    "does nothing",
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := completedFuture{
+				err: tt.fields.err,
+			}
+			tt.wantErr(t, c.AwaitCompletion(tt.args.in0), fmt.Sprintf("AwaitCompletion(%v)", tt.args.in0))
+		})
+	}
+}
+
+func Test_completedFuture_Cancel(t *testing.T) {
+	type fields struct {
+		err error
+	}
+	type args struct {
+		in0 bool
+		in1 error
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+	}{
+		{
+			name: "does nothing",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			co := completedFuture{
+				err: tt.fields.err,
+			}
+			co.Cancel(tt.args.in0, tt.args.in1)
+		})
+	}
+}
+
+func Test_completedFuture_String(t *testing.T) {
+	type fields struct {
+		err error
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "gives the error",
+			want: "completedFuture{\n\terr: <nil>,\n}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := completedFuture{
+				err: tt.fields.err,
+			}
+			assert.Equalf(t, tt.want, c.String(), "String()")
+		})
+	}
+}


[plc4x] 04/04: feat(plc4go/cbus): added more Stringer implementations

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 54221760969bf088eb07df4eb8a46f933dff6ca0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 14:17:04 2023 +0200

    feat(plc4go/cbus): added more Stringer implementations
---
 plc4go/internal/cbus/Configuration.go      |  5 ++
 plc4go/internal/cbus/Configuration_test.go | 60 +++++++++++++++++++
 plc4go/internal/cbus/Connection.go         | 46 ++++++++++++--
 plc4go/internal/cbus/Connection_test.go    | 11 +++-
 plc4go/internal/cbus/DriverContext.go      |  6 ++
 plc4go/internal/cbus/MessageCodec.go       | 21 +++++++
 plc4go/internal/cbus/MessageCodec_test.go  | 49 +++++++++++++++
 plc4go/internal/cbus/Subscriber.go         | 76 ++++++++++++-----------
 plc4go/internal/cbus/Subscriber_test.go    | 96 ++++++++++++------------------
 9 files changed, 270 insertions(+), 100 deletions(-)

diff --git a/plc4go/internal/cbus/Configuration.go b/plc4go/internal/cbus/Configuration.go
index 5655c87473..bd02271184 100644
--- a/plc4go/internal/cbus/Configuration.go
+++ b/plc4go/internal/cbus/Configuration.go
@@ -20,6 +20,7 @@
 package cbus
 
 import (
+	"fmt"
 	"github.com/rs/zerolog"
 	"reflect"
 	"strconv"
@@ -98,3 +99,7 @@ func getFromOptions(localLog zerolog.Logger, options map[string][]string, key st
 	}
 	return ""
 }
+
+func (c Configuration) String() string {
+	return fmt.Sprintf("%#v", c)
+}
diff --git a/plc4go/internal/cbus/Configuration_test.go b/plc4go/internal/cbus/Configuration_test.go
index 733d56ce27..17779239a8 100644
--- a/plc4go/internal/cbus/Configuration_test.go
+++ b/plc4go/internal/cbus/Configuration_test.go
@@ -201,3 +201,63 @@ func Test_getFromOptions(t *testing.T) {
 		})
 	}
 }
+
+func TestConfiguration_String(t *testing.T) {
+	type fields struct {
+		Srchk                 bool
+		Exstat                bool
+		Pun                   bool
+		LocalSal              bool
+		Pcn                   bool
+		Idmon                 bool
+		Monitor               bool
+		Smart                 bool
+		XonXoff               bool
+		Connect               bool
+		MonitoredApplication1 byte
+		MonitoredApplication2 byte
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			fields: fields{
+				Srchk:                 true,
+				Exstat:                true,
+				Pun:                   true,
+				LocalSal:              true,
+				Pcn:                   true,
+				Idmon:                 true,
+				Monitor:               true,
+				Smart:                 true,
+				XonXoff:               true,
+				Connect:               true,
+				MonitoredApplication1: 2,
+				MonitoredApplication2: 3,
+			},
+			want: "cbus.Configuration{Srchk:true, Exstat:true, Pun:true, LocalSal:true, Pcn:true, Idmon:true, Monitor:true, Smart:true, XonXoff:true, Connect:true, MonitoredApplication1:0x2, MonitoredApplication2:0x3}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := Configuration{
+				Srchk:                 tt.fields.Srchk,
+				Exstat:                tt.fields.Exstat,
+				Pun:                   tt.fields.Pun,
+				LocalSal:              tt.fields.LocalSal,
+				Pcn:                   tt.fields.Pcn,
+				Idmon:                 tt.fields.Idmon,
+				Monitor:               tt.fields.Monitor,
+				Smart:                 tt.fields.Smart,
+				XonXoff:               tt.fields.XonXoff,
+				Connect:               tt.fields.Connect,
+				MonitoredApplication1: tt.fields.MonitoredApplication1,
+				MonitoredApplication2: tt.fields.MonitoredApplication2,
+			}
+			assert.Equalf(t, tt.want, c.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 9d65646a7a..393007d21c 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,6 +26,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/tracer"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/rs/zerolog"
+	"github.com/rs/zerolog/log"
 	"runtime/debug"
 	"sync"
 	"time"
@@ -57,6 +58,10 @@ func (t *AlphaGenerator) getAndIncrement() byte {
 	return result
 }
 
+func (t *AlphaGenerator) String() string {
+	return fmt.Sprintf("AlphaGenerator(currentAlpha: %c)", t.currentAlpha)
+}
+
 type Connection struct {
 	_default.DefaultConnection
 	alphaGenerator AlphaGenerator
@@ -166,7 +171,11 @@ func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
 }
 
 func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
-	return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
+	return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
+		c.GetPlcTagHandler(),
+		c.GetPlcValueHandler(),
+		NewSubscriber(c.addSubscriber, options.WithCustomLogger(c.log)),
+	)
 }
 
 func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
@@ -189,7 +198,28 @@ func (c *Connection) addSubscriber(subscriber *Subscriber) {
 }
 
 func (c *Connection) String() string {
-	return fmt.Sprintf("cbus.Connection")
+	return fmt.Sprintf(
+		"cbus.Connection{\n"+
+			"\tDefaultConnection: %s,\n"+
+			"\tAlphaGenerator: %s\n"+
+			"\tMessageCodec: %s\n"+
+			"\tsubscribers: %s\n"+
+			"\ttm: %s\n"+
+			"\tconfiguration: %s\n"+
+			"\tdriverContext: %s\n"+
+			"\tconnectionId: %s\n"+
+			"\ttracer: %s\n"+
+			"}",
+		c.DefaultConnection,
+		&c.alphaGenerator,
+		c.messageCodec,
+		c.subscribers,
+		c.tm,
+		c.configuration,
+		c.driverContext,
+		c.connectionId,
+		c.tracer,
+	)
 }
 
 func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
@@ -235,12 +265,16 @@ func (c *Connection) startSubscriptionHandler() {
 		c.log.Debug().Msg("SAL handler stated")
 		for c.IsConnected() {
 			for monitoredSal := range c.messageCodec.monitoredSALs {
+				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
 						c.log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
-						continue
+						handled = true
 					}
 				}
+				if !handled {
+					log.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
+				}
 			}
 		}
 		c.log.Info().Msg("Ending SAL handler")
@@ -255,12 +289,16 @@ func (c *Connection) startSubscriptionHandler() {
 		c.log.Debug().Msg("default MMI started")
 		for c.IsConnected() {
 			for calReply := range c.messageCodec.monitoredMMIs {
+				handled := false
 				for _, subscriber := range c.subscribers {
 					if ok := subscriber.handleMonitoredMMI(calReply); ok {
 						c.log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
-						continue
+						handled = true
 					}
 				}
+				if !handled {
+					log.Debug().Msgf("MMI was not handled:\n%s", calReply)
+				}
 			}
 		}
 		c.log.Info().Msg("Ending MMI handler")
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 8feae638e5..b96d950f00 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -539,7 +539,16 @@ func TestConnection_String(t *testing.T) {
 	}{
 		{
 			name: "a string",
-			want: "cbus.Connection",
+			want: "cbus.Connection{\n" +
+				"\tDefaultConnection: %!s(<nil>),\n" +
+				"\tAlphaGenerator: AlphaGenerator(currentAlpha: \x00)\n" +
+				"\tMessageCodec: <nil>\n" +
+				"\tsubscribers: []\n" +
+				"\ttm: %!s(<nil>)\n" +
+				"\tconfiguration: cbus.Configuration{Srchk:false, Exstat:false, Pun:false, LocalSal:false, Pcn:false, Idmon:false, Monitor:false, Smart:false, XonXoff:false, Connect:false, MonitoredApplication1:0x0, MonitoredApplication2:0x0}\n\tdriverContext: cbus.DriverContext{awaitSetupComplete:false, awaitDisconnectComplete:false}\n" +
+				"\tconnectionId: \n" +
+				"\ttracer: <nil>\n" +
+				"}",
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/internal/cbus/DriverContext.go b/plc4go/internal/cbus/DriverContext.go
index f0ed07b1f9..7c4307c960 100644
--- a/plc4go/internal/cbus/DriverContext.go
+++ b/plc4go/internal/cbus/DriverContext.go
@@ -19,6 +19,8 @@
 
 package cbus
 
+import "fmt"
+
 type DriverContext struct {
 	awaitSetupComplete      bool
 	awaitDisconnectComplete bool
@@ -27,3 +29,7 @@ type DriverContext struct {
 func NewDriverContext(_ Configuration) DriverContext {
 	return DriverContext{}
 }
+
+func (d DriverContext) String() string {
+	return fmt.Sprintf("%#v", d)
+}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 1c4c3e3743..b8119c7d8f 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,6 +21,7 @@ package cbus
 
 import (
 	"bufio"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/rs/zerolog"
 	"hash/crc32"
@@ -322,6 +323,26 @@ lookingForTheEnd:
 	return cBusMessage, nil
 }
 
+func (m *MessageCodec) String() string {
+	return fmt.Sprintf("MessageCodec{\n"+
+		"\tDefaultCodec: %s,\n"+
+		"\trequestContext: %s,\n"+
+		"\tcbusOptions: %s,\n"+
+		"\tmonitoredMMIs: %d elements,\n"+
+		"\tmonitoredSALs: %d elements,\n"+
+		"\tlastPackageHash: %d,\n"+
+		"\thashEncountered: %d,\n"+
+		"}",
+		m.DefaultCodec,
+		m.requestContext,
+		m.cbusOptions,
+		len(m.monitoredSALs),
+		len(m.monitoredSALs),
+		m.lastPackageHash,
+		m.hashEncountered,
+	)
+}
+
 func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
 	return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
 		log.Trace().Msgf("Custom handling message:\n%s", message)
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 2c11659dfb..68abdb02e0 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"testing"
 )
@@ -878,3 +879,51 @@ func Test_extractMMIAndSAL(t *testing.T) {
 		})
 	}
 }
+
+func TestMessageCodec_String(t *testing.T) {
+	type fields struct {
+		DefaultCodec                  _default.DefaultCodec
+		requestContext                readWriteModel.RequestContext
+		cbusOptions                   readWriteModel.CBusOptions
+		monitoredMMIs                 chan readWriteModel.CALReply
+		monitoredSALs                 chan readWriteModel.MonitoredSAL
+		lastPackageHash               uint32
+		hashEncountered               uint
+		currentlyReportedServerErrors uint
+		log                           zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "string it",
+			want: "MessageCodec{\n" +
+				"\tDefaultCodec: %!s(<nil>),\n" +
+				"\trequestContext: %!s(<nil>),\n" +
+				"\tcbusOptions: %!s(<nil>),\n" +
+				"\tmonitoredMMIs: 0 elements,\n" +
+				"\tmonitoredSALs: 0 elements,\n" +
+				"\tlastPackageHash: 0,\n" +
+				"\thashEncountered: 0,\n" +
+				"}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := &MessageCodec{
+				DefaultCodec:                  tt.fields.DefaultCodec,
+				requestContext:                tt.fields.requestContext,
+				cbusOptions:                   tt.fields.cbusOptions,
+				monitoredMMIs:                 tt.fields.monitoredMMIs,
+				monitoredSALs:                 tt.fields.monitoredSALs,
+				lastPackageHash:               tt.fields.lastPackageHash,
+				hashEncountered:               tt.fields.hashEncountered,
+				currentlyReportedServerErrors: tt.fields.currentlyReportedServerErrors,
+				log:                           tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, m.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index e0fef87a17..0454ac9ebb 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -37,22 +37,22 @@ import (
 )
 
 type Subscriber struct {
-	connection *Connection
-	consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+	consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+	addSubscriber func(subscriber *Subscriber)
 
 	log zerolog.Logger
 }
 
-func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
+func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
 	return &Subscriber{
-		connection: connection,
-		consumers:  make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+		addSubscriber: addSubscriber,
+		consumers:     make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
 
 		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
-func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
 	result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
 	go func() {
 		defer func() {
@@ -63,14 +63,14 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 		internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
 
 		// Add this subscriber to the connection.
-		m.connection.addSubscriber(m)
+		s.addSubscriber(s)
 
 		// Just populate all requests with an OK
 		responseCodes := map[string]apiModel.PlcResponseCode{}
 		subscriptionValues := make(map[string]apiModel.PlcSubscriptionHandle)
 		for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
 			responseCodes[tagName] = apiModel.PlcResponseCode_OK
-			subscriptionValues[tagName] = NewSubscriptionHandle(m, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
+			subscriptionValues[tagName] = NewSubscriptionHandle(s, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
 		}
 
 		result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
@@ -79,7 +79,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 				subscriptionRequest,
 				responseCodes,
 				subscriptionValues,
-				options.WithCustomLogger(m.log),
+				options.WithCustomLogger(s.log),
 			),
 			nil,
 		)
@@ -87,7 +87,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
 	return result
 }
 
-func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (s *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
 	// TODO: handle context
 	result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
 	result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
@@ -100,7 +100,7 @@ func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiM
 	return result
 }
 
-func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
+func (s *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 	var unitAddressString string
 	switch calReply := calReply.(type) {
 	case readWriteModel.CALReplyLongExactly:
@@ -119,18 +119,18 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
 	}
 	calData := calReply.GetCalData()
 	handled := false
-	for registration, consumer := range m.consumers {
+	for registration, consumer := range s.consumers {
 		for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
-			handled = handled || m.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
+			handled = handled || s.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
 		}
 	}
 	return handled
 }
 
-func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
 	tag, ok := subscriptionHandle.tag.(*mmiMonitorTag)
 	if !ok {
-		m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+		s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
 		return false
 	}
 
@@ -146,7 +146,7 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 	if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
 		unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
 		if !strings.HasSuffix(unitAddressString, unitSuffix) {
-			m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+			s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
 			return false
 		}
 	}
@@ -228,12 +228,12 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 			plcValues[tagName] = spiValues.NewPlcList(plcListValues)
 		}
 	default:
-		m.log.Error().Msgf("Unmapped type %T", calData)
+		s.log.Error().Msgf("Unmapped type %T", calData)
 		return false
 	}
 	if application := tag.GetApplication(); application != nil {
 		if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
-			m.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+			s.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
 			return false
 		}
 	}
@@ -249,20 +249,20 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
 	return true
 }
 
-func (m *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
+func (s *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
 	handled := false
-	for registration, consumer := range m.consumers {
+	for registration, consumer := range s.consumers {
 		for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
-			handled = handled || m.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
+			handled = handled || s.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
 		}
 	}
 	return handled
 }
 
-func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
 	tag, ok := subscriptionHandle.tag.(*salMonitorTag)
 	if !ok {
-		m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+		s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
 		return false
 	}
 	tags := map[string]apiModel.PlcTag{}
@@ -306,7 +306,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
 		unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
 		if !strings.HasSuffix(unitAddressString, unitSuffix) {
-			m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+			s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
 			return false
 		}
 	}
@@ -314,7 +314,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 
 	if application := tag.GetApplication(); application != nil {
 		if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
-			m.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+			s.log.Debug().Msgf("Current application id %s  doesn't match actual id %s", unitAddressString, actualApplicationIdString)
 			return false
 		}
 	}
@@ -336,7 +336,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataErrorReportingExactly:
 		commandTypeGetter = salData.GetErrorReportingData().GetCommandType()
 	case readWriteModel.SALDataFreeUsageExactly:
-		m.log.Info().Msg("Unknown command type")
+		s.log.Info().Msg("Unknown command type")
 	case readWriteModel.SALDataHeatingExactly:
 		commandTypeGetter = salData.GetHeatingData().GetCommandType()
 	case readWriteModel.SALDataHvacActuatorExactly:
@@ -354,9 +354,9 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataPoolsSpasPondsFountainsControlExactly:
 		commandTypeGetter = salData.GetPoolsSpaPondsFountainsData().GetCommandType()
 	case readWriteModel.SALDataReservedExactly:
-		m.log.Info().Msg("Unknown command type")
+		s.log.Info().Msg("Unknown command type")
 	case readWriteModel.SALDataRoomControlSystemExactly:
-		m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+		s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
 	case readWriteModel.SALDataSecurityExactly:
 		commandTypeGetter = salData.GetSecurityData().GetCommandType()
 	case readWriteModel.SALDataTelephonyStatusAndControlExactly:
@@ -364,13 +364,13 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	case readWriteModel.SALDataTemperatureBroadcastExactly:
 		commandTypeGetter = salData.GetTemperatureBroadcastData().GetCommandType()
 	case readWriteModel.SALDataTestingExactly:
-		m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+		s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
 	case readWriteModel.SALDataTriggerControlExactly:
 		commandTypeGetter = salData.GetTriggerControlData().GetCommandType()
 	case readWriteModel.SALDataVentilationExactly:
 		commandTypeGetter = salData.GetVentilationData().GetCommandType()
 	default:
-		m.log.Error().Msgf("Unmapped type %T", salData)
+		s.log.Error().Msgf("Unmapped type %T", salData)
 	}
 	commandType := "Unknown"
 	if commandTypeGetter != nil {
@@ -383,7 +383,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	rbvb := spiValues.NewWriteBufferPlcValueBased()
 	err := salData.SerializeWithWriteBuffer(context.Background(), rbvb)
 	if err != nil {
-		m.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
+		s.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
 		plcValues[tagName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
 	} else {
 		plcValues[tagName] = rbvb.GetPlcValue()
@@ -397,12 +397,16 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
 	return true
 }
 
-func (m *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
-	consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(m, consumer, handles...)
-	m.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
+func (s *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
+	consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(s, consumer, handles...)
+	s.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
 	return consumerRegistration
 }
 
-func (m *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
-	delete(m.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+func (s *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
+	delete(s.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+}
+
+func (s *Subscriber) String() string {
+	return fmt.Sprintf("cbus.Subcriber{\n\tconsumers: %d elements\n}", len(s.consumers))
 }
diff --git a/plc4go/internal/cbus/Subscriber_test.go b/plc4go/internal/cbus/Subscriber_test.go
index d5d794c950..b36f7ae481 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,9 +21,6 @@ package cbus
 
 import (
 	"context"
-	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/testutils"
-	"github.com/apache/plc4x/plc4go/spi/utils"
 	"testing"
 	"time"
 
@@ -36,7 +33,7 @@ import (
 
 func TestNewSubscriber(t *testing.T) {
 	type args struct {
-		connection *Connection
+		addSubscriber func(subscriber *Subscriber)
 	}
 	tests := []struct {
 		name string
@@ -52,15 +49,15 @@ func TestNewSubscriber(t *testing.T) {
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			assert.Equalf(t, tt.want, NewSubscriber(tt.args.connection), "NewSubscriber(%v)", tt.args.connection)
+			assert.Equalf(t, tt.want, NewSubscriber(tt.args.addSubscriber), "NewSubscriber(%t)", tt.args.addSubscriber != nil)
 		})
 	}
 }
 
 func TestSubscriber_Subscribe(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		in0                 context.Context
@@ -80,28 +77,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
 				subscriptionRequest: spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, map[string]apiModel.PlcTag{"blub": NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
 			},
 			setup: func(t *testing.T, fields *fields, args *args) {
-				// Setup logger
-				logger := testutils.ProduceTestingLogger(t)
-
-				loggerOption := options.WithCustomLogger(logger)
-
-				// Set the model logger to the logger above
-				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
-
-				codec := NewMessageCodec(nil, loggerOption)
-				connection := NewConnection(codec, Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
-				t.Cleanup(func() {
-					timer := time.NewTimer(1 * time.Second)
-					t.Cleanup(func() {
-						utils.CleanupTimer(timer)
-					})
-					select {
-					case <-connection.Close():
-					case <-timer.C:
-						t.Error("timeout")
-					}
-				})
-				fields.connection = connection
+				fields.addSubscriber = func(subscriber *Subscriber) {
+					assert.NotNil(t, subscriber)
+				}
 			},
 			wantAsserter: func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool {
 				timer := time.NewTimer(2 * time.Second)
@@ -123,8 +101,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
 				tt.setup(t, &tt.fields, &tt.args)
 			}
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Truef(t, tt.wantAsserter(t, m.Subscribe(tt.args.in0, tt.args.subscriptionRequest)), "Subscribe(%v, %v)", tt.args.in0, tt.args.subscriptionRequest)
 		})
@@ -133,8 +111,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
 
 func TestSubscriber_Unsubscribe(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		ctx                   context.Context
@@ -158,8 +136,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Truef(t, tt.wantAsserter(t, m.Unsubscribe(tt.args.ctx, tt.args.unsubscriptionRequest)), "Unsubscribe(%v, %v)", tt.args.ctx, tt.args.unsubscriptionRequest)
 		})
@@ -168,8 +146,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
 
 func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		calReply model.CALReply
@@ -249,8 +227,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.handleMonitoredMMI(tt.args.calReply), "handleMonitoredMMI(%v)", tt.args.calReply)
 		})
@@ -259,8 +237,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
 
 func TestSubscriber_offerMMI(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		unitAddressString  string
@@ -501,8 +479,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.offerMMI(tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerMMI(%v,\n%v\n, \n%v\n, func())", tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle)
 		})
@@ -511,8 +489,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
 
 func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		sal model.MonitoredSAL
@@ -546,8 +524,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.handleMonitoredSAL(tt.args.sal), "handleMonitoredSAL(%v)", tt.args.sal)
 		})
@@ -556,8 +534,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
 
 func TestSubscriber_offerSAL(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		sal                model.MonitoredSAL
@@ -1368,8 +1346,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.Equalf(t, tt.want, m.offerSAL(tt.args.sal, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerSAL(\n%v\n, \n%v\n)", tt.args.sal, tt.args.subscriptionHandle)
 		})
@@ -1378,8 +1356,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
 
 func TestSubscriber_Register(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		consumer apiModel.PlcSubscriptionEventConsumer
@@ -1400,8 +1378,8 @@ func TestSubscriber_Register(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			assert.NotNilf(t, m.Register(tt.args.consumer, tt.args.handles), "Register(func(), %v)", tt.args.handles)
 		})
@@ -1410,8 +1388,8 @@ func TestSubscriber_Register(t *testing.T) {
 
 func TestSubscriber_Unregister(t *testing.T) {
 	type fields struct {
-		connection *Connection
-		consumers  map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+		addSubscriber func(subscriber *Subscriber)
+		consumers     map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
 	}
 	type args struct {
 		registration apiModel.PlcConsumerRegistration
@@ -1431,8 +1409,8 @@ func TestSubscriber_Unregister(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			m := &Subscriber{
-				connection: tt.fields.connection,
-				consumers:  tt.fields.consumers,
+				addSubscriber: tt.fields.addSubscriber,
+				consumers:     tt.fields.consumers,
 			}
 			m.Unregister(tt.args.registration)
 		})


[plc4x] 01/04: refactor(plc4go/spi): split up request transaction into separate file

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit d7d54912ee102858bd60a28aaea08713a6b32bcb
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:29:06 2023 +0200

    refactor(plc4go/spi): split up request transaction into separate file
---
 plc4go/spi/transactions/RequestTransaction.go      | 148 ++++++++++
 .../spi/transactions/RequestTransactionManager.go  | 113 --------
 .../transactions/RequestTransactionManager_test.go | 288 -------------------
 plc4go/spi/transactions/RequestTransaction_test.go | 319 +++++++++++++++++++++
 plc4go/spi/transactions/completedFuture.go         |  34 +++
 5 files changed, 501 insertions(+), 401 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
new file mode 100644
index 0000000000..de8c5bed84
--- /dev/null
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+	"context"
+	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/pool"
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
+	"github.com/rs/zerolog/log"
+	"sync"
+	"time"
+)
+
+// RequestTransaction represents a transaction
+type RequestTransaction interface {
+	fmt.Stringer
+	// FailRequest signals that this transaction has failed
+	FailRequest(err error) error
+	// EndRequest signals that this transaction is done
+	EndRequest() error
+	// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
+	Submit(operation RequestTransactionRunnable)
+	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
+	AwaitCompletion(ctx context.Context) error
+	// IsCompleted indicates that the that this RequestTransaction is completed
+	IsCompleted() bool
+}
+
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type requestTransaction struct {
+	parent        *requestTransactionManager
+	transactionId int32
+
+	/** The initial operation to perform to kick off the request */
+	operation        pool.Runnable
+	completionFuture pool.CompletionFuture
+
+	stateChangeMutex sync.Mutex
+	completed        bool
+
+	transactionLog zerolog.Logger
+}
+
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
+func (t *requestTransaction) FailRequest(err error) error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.Wrap(err, "calling fail on a already completed transaction")
+	}
+	t.transactionLog.Trace().Msg("Fail the request")
+	t.completed = true
+	return t.parent.failRequest(t, err)
+}
+
+func (t *requestTransaction) EndRequest() error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.New("calling end on a already completed transaction")
+	}
+	t.transactionLog.Trace().Msg("Ending the request")
+	t.completed = true
+	// Remove it from Running Requests
+	return t.parent.endRequest(t)
+}
+
+func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
+		return
+	}
+	if t.operation != nil {
+		t.transactionLog.Warn().Msg("Operation already set")
+	}
+	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
+	t.operation = func() {
+		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+		operation(t)
+		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+	}
+	t.parent.submitTransaction(t)
+}
+
+func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
+	timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
+	defer cancelFunc()
+	for t.completionFuture == nil {
+		time.Sleep(time.Millisecond * 10)
+		if err := timeout.Err(); err != nil {
+			log.Error().Msg("Timout after a long time. This means something is very of here")
+			return errors.Wrap(err, "Error waiting for completion future to be set")
+		}
+	}
+	if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+		return err
+	}
+	stillActive := true
+	for stillActive {
+		stillActive = false
+		for _, runningRequest := range t.parent.runningRequests {
+			if runningRequest.transactionId == t.transactionId {
+				stillActive = true
+				break
+			}
+		}
+	}
+	return nil
+}
+
+func (t *requestTransaction) IsCompleted() bool {
+	return t.completed
+}
+
+func (t *requestTransaction) String() string {
+	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index c06e0de71e..8a93dc6f87 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -22,7 +22,6 @@ package transactions
 import (
 	"container/list"
 	"context"
-	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
 	"io"
@@ -46,21 +45,6 @@ func init() {
 
 type RequestTransactionRunnable func(RequestTransaction)
 
-// RequestTransaction represents a transaction
-type RequestTransaction interface {
-	fmt.Stringer
-	// FailRequest signals that this transaction has failed
-	FailRequest(err error) error
-	// EndRequest signals that this transaction is done
-	EndRequest() error
-	// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
-	Submit(operation RequestTransactionRunnable)
-	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
-	AwaitCompletion(ctx context.Context) error
-	// IsCompleted indicates that the that this RequestTransaction is completed
-	IsCompleted() bool
-}
-
 // RequestTransactionManager handles transactions
 type RequestTransactionManager interface {
 	io.Closer
@@ -109,20 +93,6 @@ type withCustomExecutor struct {
 	executor pool.Executor
 }
 
-type requestTransaction struct {
-	parent        *requestTransactionManager
-	transactionId int32
-
-	/** The initial operation to perform to kick off the request */
-	operation        pool.Runnable
-	completionFuture pool.CompletionFuture
-
-	stateChangeMutex sync.Mutex
-	completed        bool
-
-	transactionLog zerolog.Logger
-}
-
 type requestTransactionManager struct {
 	runningRequests []*requestTransaction
 	// How many transactions are allowed to run at the same time?
@@ -189,18 +159,6 @@ func (r *requestTransactionManager) processWorklog() {
 	}
 }
 
-type completedFuture struct {
-	err error
-}
-
-func (c completedFuture) AwaitCompletion(_ context.Context) error {
-	return c.err
-}
-
-func (completedFuture) Cancel(_ bool, _ error) {
-	// No op
-}
-
 func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 	r.transactionMutex.Lock()
 	defer r.transactionMutex.Unlock()
@@ -288,74 +246,3 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	r.runningRequests = nil
 	return r.executor.Close()
 }
-
-func (t *requestTransaction) FailRequest(err error) error {
-	t.stateChangeMutex.Lock()
-	defer t.stateChangeMutex.Unlock()
-	if t.completed {
-		return errors.Wrap(err, "calling fail on a already completed transaction")
-	}
-	t.transactionLog.Trace().Msg("Fail the request")
-	t.completed = true
-	return t.parent.failRequest(t, err)
-}
-
-func (t *requestTransaction) EndRequest() error {
-	t.stateChangeMutex.Lock()
-	defer t.stateChangeMutex.Unlock()
-	if t.completed {
-		return errors.New("calling end on a already completed transaction")
-	}
-	t.transactionLog.Trace().Msg("Ending the request")
-	t.completed = true
-	// Remove it from Running Requests
-	return t.parent.endRequest(t)
-}
-
-func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
-	t.stateChangeMutex.Lock()
-	defer t.stateChangeMutex.Unlock()
-	if t.completed {
-		t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
-		return
-	}
-	if t.operation != nil {
-		t.transactionLog.Warn().Msg("Operation already set")
-	}
-	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
-	t.operation = func() {
-		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
-		operation(t)
-		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
-	}
-	t.parent.submitTransaction(t)
-}
-
-func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
-	for t.completionFuture == nil {
-		time.Sleep(time.Millisecond * 10)
-		// TODO: this should timeout and not loop infinite...
-	}
-	if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
-		return err
-	}
-	stillActive := true
-	for stillActive {
-		stillActive = false
-		for _, runningRequest := range t.parent.runningRequests {
-			if runningRequest.transactionId == t.transactionId {
-				stillActive = true
-				break
-			}
-		}
-	}
-	return nil
-}
-
-func (t *requestTransaction) IsCompleted() bool {
-	return t.completed
-}
-
-func (t *requestTransaction) String() string {
-	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
-}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 7384f87871..4445edad1e 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -26,10 +26,8 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
-	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
-	"github.com/stretchr/testify/mock"
 	"testing"
 	"time"
 )
@@ -453,292 +451,6 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
 	}
 }
 
-func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
-	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-	}
-	type args struct {
-		ctx context.Context
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		args      args
-		mockSetup func(t *testing.T, fields *fields, args *args)
-		wantErr   bool
-	}{
-		{
-			name: "just wait",
-			fields: fields{
-				parent: &requestTransactionManager{
-					runningRequests: []*requestTransaction{
-						func() *requestTransaction {
-							r := &requestTransaction{}
-							go func() {
-								time.Sleep(100 * time.Millisecond)
-								// We fake an ending transaction like that
-								r.transactionId = 1
-							}()
-							return r
-						}(),
-					},
-				},
-			},
-			args: args{
-				ctx: func() context.Context {
-					ctx, cancelFunc := context.WithCancel(context.Background())
-					cancelFunc()
-					return ctx
-				}(),
-			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				completionFuture := NewMockCompletionFuture(t)
-				expect := completionFuture.EXPECT()
-				expect.AwaitCompletion(mock.Anything).Return(nil)
-				fields.completionFuture = completionFuture
-			},
-		},
-	}
-	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
-			if tt.mockSetup != nil {
-				tt.mockSetup(t1, &tt.fields, &tt.args)
-			}
-			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-			}
-			if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
-				t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
-			}
-		})
-	}
-}
-
-func Test_requestTransaction_EndRequest(t1 *testing.T) {
-	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-		completed        bool
-	}
-	tests := []struct {
-		name    string
-		fields  fields
-		wantErr bool
-	}{
-		{
-			name: "just end it",
-			fields: fields{
-				parent: &requestTransactionManager{},
-			},
-			wantErr: true,
-		},
-		{
-			name: "end it completed",
-			fields: fields{
-				parent:    &requestTransactionManager{},
-				completed: true,
-			},
-			wantErr: true,
-		},
-	}
-	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
-			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-				completed:        tt.fields.completed,
-			}
-			if err := t.EndRequest(); (err != nil) != tt.wantErr {
-				t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
-			}
-		})
-	}
-}
-
-func Test_requestTransaction_FailRequest(t1 *testing.T) {
-	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-		completed        bool
-	}
-	type args struct {
-		err error
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		args      args
-		mockSetup func(t *testing.T, fields *fields, args *args)
-		wantErr   assert.ErrorAssertionFunc
-	}{
-		{
-			name: "just fail it",
-			fields: fields{
-				parent: &requestTransactionManager{},
-			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				completionFuture := NewMockCompletionFuture(t)
-				expect := completionFuture.EXPECT()
-				expect.Cancel(true, nil).Return()
-				fields.completionFuture = completionFuture
-			},
-			wantErr: assert.Error,
-		},
-		{
-			name: "just fail it (completed)",
-			args: args{
-				err: errors.New("nope"),
-			},
-			fields: fields{
-				parent:    &requestTransactionManager{},
-				completed: true,
-			},
-			wantErr: assert.Error,
-		},
-	}
-	for _, tt := range tests {
-		t1.Run(tt.name, func(t *testing.T) {
-			if tt.mockSetup != nil {
-				tt.mockSetup(t, &tt.fields, &tt.args)
-			}
-			r := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-				completed:        tt.fields.completed,
-			}
-			tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
-		})
-	}
-}
-
-func Test_requestTransaction_String(t1 *testing.T) {
-	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-	}
-	tests := []struct {
-		name   string
-		fields fields
-		want   string
-	}{
-		{
-			name: "give a string",
-			want: "Transaction{tid:0}",
-		},
-	}
-	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
-			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-			}
-			if got := t.String(); got != tt.want {
-				t1.Errorf("String() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}
-
-func Test_requestTransaction_Submit(t1 *testing.T) {
-	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-		completed        bool
-	}
-	type args struct {
-		operation RequestTransactionRunnable
-	}
-	tests := []struct {
-		name   string
-		fields fields
-		args   args
-	}{
-		{
-			name: "submit something",
-			fields: fields{
-				parent: &requestTransactionManager{},
-			},
-			args: args{
-				operation: func(_ RequestTransaction) {
-					// NOOP
-				},
-			},
-		},
-		{
-			name: "submit something again",
-			fields: fields{
-				parent: &requestTransactionManager{},
-				operation: func() {
-					// NOOP
-				},
-			},
-			args: args{
-				operation: func(_ RequestTransaction) {
-					// NOOP
-				},
-			},
-		},
-		{
-			name: "submit completed",
-			fields: fields{
-				parent: &requestTransactionManager{},
-				operation: func() {
-					// NOOP
-				},
-				completed: true,
-			},
-			args: args{
-				operation: func(_ RequestTransaction) {
-					// NOOP
-				},
-			},
-		},
-	}
-	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
-			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-				completed:        tt.fields.completed,
-			}
-			t.Submit(tt.args.operation)
-			t.operation()
-		})
-	}
-}
-
 func Test_requestTransactionManager_Close(t *testing.T) {
 	type fields struct {
 		runningRequests                     []*requestTransaction
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go b/plc4go/spi/transactions/RequestTransaction_test.go
new file mode 100644
index 0000000000..5dad61f345
--- /dev/null
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/apache/plc4x/plc4go/spi/pool"
+
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/mock"
+)
+
+func Test_requestTransaction_EndRequest(t1 *testing.T) {
+	type fields struct {
+		parent           *requestTransactionManager
+		transactionId    int32
+		operation        pool.Runnable
+		completionFuture pool.CompletionFuture
+		transactionLog   zerolog.Logger
+		completed        bool
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		wantErr bool
+	}{
+		{
+			name: "just end it",
+			fields: fields{
+				parent: &requestTransactionManager{},
+			},
+			wantErr: true,
+		},
+		{
+			name: "end it completed",
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: true,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &requestTransaction{
+				parent:           tt.fields.parent,
+				transactionId:    tt.fields.transactionId,
+				operation:        tt.fields.operation,
+				completionFuture: tt.fields.completionFuture,
+				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
+			}
+			if err := t.EndRequest(); (err != nil) != tt.wantErr {
+				t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func Test_requestTransaction_FailRequest(t1 *testing.T) {
+	type fields struct {
+		parent           *requestTransactionManager
+		transactionId    int32
+		operation        pool.Runnable
+		completionFuture pool.CompletionFuture
+		transactionLog   zerolog.Logger
+		completed        bool
+	}
+	type args struct {
+		err error
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		args      args
+		mockSetup func(t *testing.T, fields *fields, args *args)
+		wantErr   assert.ErrorAssertionFunc
+	}{
+		{
+			name: "just fail it",
+			fields: fields{
+				parent: &requestTransactionManager{},
+			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				completionFuture := NewMockCompletionFuture(t)
+				expect := completionFuture.EXPECT()
+				expect.Cancel(true, nil).Return()
+				fields.completionFuture = completionFuture
+			},
+			wantErr: assert.Error,
+		},
+		{
+			name: "just fail it (completed)",
+			args: args{
+				err: errors.New("nope"),
+			},
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: assert.Error,
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t *testing.T) {
+			if tt.mockSetup != nil {
+				tt.mockSetup(t, &tt.fields, &tt.args)
+			}
+			r := &requestTransaction{
+				parent:           tt.fields.parent,
+				transactionId:    tt.fields.transactionId,
+				operation:        tt.fields.operation,
+				completionFuture: tt.fields.completionFuture,
+				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
+			}
+			tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
+		})
+	}
+}
+
+func Test_requestTransaction_String(t1 *testing.T) {
+	type fields struct {
+		parent           *requestTransactionManager
+		transactionId    int32
+		operation        pool.Runnable
+		completionFuture pool.CompletionFuture
+		transactionLog   zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "give a string",
+			want: "Transaction{tid:0}",
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &requestTransaction{
+				parent:           tt.fields.parent,
+				transactionId:    tt.fields.transactionId,
+				operation:        tt.fields.operation,
+				completionFuture: tt.fields.completionFuture,
+				transactionLog:   tt.fields.transactionLog,
+			}
+			if got := t.String(); got != tt.want {
+				t1.Errorf("String() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func Test_requestTransaction_Submit(t1 *testing.T) {
+	type fields struct {
+		parent           *requestTransactionManager
+		transactionId    int32
+		operation        pool.Runnable
+		completionFuture pool.CompletionFuture
+		transactionLog   zerolog.Logger
+		completed        bool
+	}
+	type args struct {
+		operation RequestTransactionRunnable
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		args   args
+	}{
+		{
+			name: "submit something",
+			fields: fields{
+				parent: &requestTransactionManager{},
+			},
+			args: args{
+				operation: func(_ RequestTransaction) {
+					// NOOP
+				},
+			},
+		},
+		{
+			name: "submit something again",
+			fields: fields{
+				parent: &requestTransactionManager{},
+				operation: func() {
+					// NOOP
+				},
+			},
+			args: args{
+				operation: func(_ RequestTransaction) {
+					// NOOP
+				},
+			},
+		},
+		{
+			name: "submit completed",
+			fields: fields{
+				parent: &requestTransactionManager{},
+				operation: func() {
+					// NOOP
+				},
+				completed: true,
+			},
+			args: args{
+				operation: func(_ RequestTransaction) {
+					// NOOP
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &requestTransaction{
+				parent:           tt.fields.parent,
+				transactionId:    tt.fields.transactionId,
+				operation:        tt.fields.operation,
+				completionFuture: tt.fields.completionFuture,
+				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
+			}
+			t.Submit(tt.args.operation)
+			t.operation()
+		})
+	}
+}
+
+func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
+	type fields struct {
+		parent           *requestTransactionManager
+		transactionId    int32
+		operation        pool.Runnable
+		completionFuture pool.CompletionFuture
+		transactionLog   zerolog.Logger
+	}
+	type args struct {
+		ctx context.Context
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		args      args
+		mockSetup func(t *testing.T, fields *fields, args *args)
+		wantErr   bool
+	}{
+		{
+			name: "just wait",
+			fields: fields{
+				parent: &requestTransactionManager{
+					runningRequests: []*requestTransaction{
+						func() *requestTransaction {
+							r := &requestTransaction{}
+							go func() {
+								time.Sleep(100 * time.Millisecond)
+								// We fake an ending transaction like that
+								r.transactionId = 1
+							}()
+							return r
+						}(),
+					},
+				},
+			},
+			args: args{
+				ctx: func() context.Context {
+					ctx, cancelFunc := context.WithCancel(context.Background())
+					cancelFunc()
+					return ctx
+				}(),
+			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				completionFuture := NewMockCompletionFuture(t)
+				expect := completionFuture.EXPECT()
+				expect.AwaitCompletion(mock.Anything).Return(nil)
+				fields.completionFuture = completionFuture
+			},
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			if tt.mockSetup != nil {
+				tt.mockSetup(t1, &tt.fields, &tt.args)
+			}
+			t := &requestTransaction{
+				parent:           tt.fields.parent,
+				transactionId:    tt.fields.transactionId,
+				operation:        tt.fields.operation,
+				completionFuture: tt.fields.completionFuture,
+				transactionLog:   tt.fields.transactionLog,
+			}
+			if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
+				t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
diff --git a/plc4go/spi/transactions/completedFuture.go b/plc4go/spi/transactions/completedFuture.go
new file mode 100644
index 0000000000..ac8227f579
--- /dev/null
+++ b/plc4go/spi/transactions/completedFuture.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import "context"
+
+type completedFuture struct {
+	err error
+}
+
+func (c completedFuture) AwaitCompletion(_ context.Context) error {
+	return c.err
+}
+
+func (completedFuture) Cancel(_ bool, _ error) {
+	// No op
+}


[plc4x] 02/04: refactor(plc4go/spi): split up pool into multiple files

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ade510700aab3ef7200064501bd2f01152919aa9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:44:07 2023 +0200

    refactor(plc4go/spi): split up pool into multiple files
---
 plc4go/spi/pool/CompletionFuture.go      |  90 ++++
 plc4go/spi/pool/CompletionFuture_test.go | 191 +++++++++
 plc4go/spi/pool/WorkerPool.go            | 393 +-----------------
 plc4go/spi/pool/WorkerPool_test.go       | 689 -------------------------------
 plc4go/spi/pool/dynamicExecutor.go       | 173 ++++++++
 plc4go/spi/pool/dynamicExecutor_test.go  | 170 ++++++++
 plc4go/spi/pool/executor.go              | 129 ++++++
 plc4go/spi/pool/executor_test.go         | 413 ++++++++++++++++++
 plc4go/spi/pool/workItem.go              |  32 ++
 plc4go/spi/pool/workItem_test.go         |  53 +++
 plc4go/spi/pool/worker.go                |  96 +++++
 plc4go/spi/pool/worker_test.go           | 227 ++++++++++
 12 files changed, 1585 insertions(+), 1071 deletions(-)

diff --git a/plc4go/spi/pool/CompletionFuture.go b/plc4go/spi/pool/CompletionFuture.go
new file mode 100644
index 0000000000..5d11002356
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+	"time"
+
+	"github.com/pkg/errors"
+)
+
+type CompletionFuture interface {
+	AwaitCompletion(ctx context.Context) error
+	Cancel(interrupt bool, err error)
+}
+
+type future struct {
+	cancelRequested    atomic.Bool
+	interruptRequested atomic.Bool
+	completed          atomic.Bool
+	errored            atomic.Bool
+	err                atomic.Value
+}
+
+func (f *future) Cancel(interrupt bool, err error) {
+	f.cancelRequested.Store(true)
+	f.interruptRequested.Store(interrupt)
+	if err != nil {
+		f.errored.Store(true)
+		f.err.Store(err)
+	}
+}
+
+func (f *future) complete() {
+	f.completed.Store(true)
+}
+
+// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
+var Canceled = errors.New("Canceled")
+
+func (f *future) AwaitCompletion(ctx context.Context) error {
+	for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
+		time.Sleep(time.Millisecond * 10)
+	}
+	if err := ctx.Err(); err != nil {
+		return err
+	}
+	if err, ok := f.err.Load().(error); ok {
+		return err
+	}
+	if f.cancelRequested.Load() {
+		return Canceled
+	}
+	return nil
+}
+
+func (f *future) String() string {
+	return fmt.Sprintf("future{\n"+
+		"\tcancelRequested: %t,\n"+
+		"\tinterruptRequested: %t,\n"+
+		"\tcompleted: %t,\n"+
+		"\terrored: %t,\n"+
+		"\terr: %v,\n"+
+		"}",
+		f.cancelRequested.Load(),
+		f.interruptRequested.Load(),
+		f.completed.Load(),
+		f.errored.Load(),
+		f.err.Load(),
+	)
+}
diff --git a/plc4go/spi/pool/CompletionFuture_test.go b/plc4go/spi/pool/CompletionFuture_test.go
new file mode 100644
index 0000000000..79fdaeace7
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture_test.go
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+func Test_future_AwaitCompletion(t *testing.T) {
+	type args struct {
+		ctx context.Context
+	}
+	tests := []struct {
+		name      string
+		args      args
+		completer func(*future)
+		wantErr   assert.ErrorAssertionFunc
+	}{
+		{
+			name: "completes with error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				f.Cancel(false, errors.New("Uh oh"))
+			},
+			wantErr: assert.Error,
+		},
+		{
+			name: "completes regular",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 30)
+				f.complete()
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "completes not int time",
+			args: args{ctx: func() context.Context {
+				deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
+				t.Cleanup(cancel)
+				return deadline
+			}()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+			},
+			wantErr: assert.Error,
+		},
+		{
+			name: "completes canceled without error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+				f.Cancel(true, nil)
+			},
+			wantErr: func(t assert.TestingT, err error, i ...any) bool {
+				assert.Same(t, Canceled, err)
+				return true
+			},
+		},
+		{
+			name: "completes canceled with particular error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+				f.Cancel(true, errors.New("Uh oh"))
+			},
+			wantErr: func(t assert.TestingT, err error, i ...any) bool {
+				assert.Equal(t, "Uh oh", err.Error())
+				return true
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			go tt.completer(f)
+			tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
+		})
+	}
+}
+
+func Test_future_Cancel(t *testing.T) {
+	type args struct {
+		interrupt bool
+		err       error
+	}
+	tests := []struct {
+		name     string
+		args     args
+		verifier func(*testing.T, *future)
+	}{
+		{
+			name: "cancel cancels",
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+			},
+		},
+		{
+			name: "cancel with interrupt",
+			args: args{
+				interrupt: true,
+				err:       nil,
+			},
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+				assert.False(t, f.errored.Load())
+				assert.Nil(t, f.err.Load())
+			},
+		},
+		{
+			name: "cancel with err",
+			args: args{
+				interrupt: true,
+				err:       errors.New("Uh Oh"),
+			},
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+				assert.True(t, f.errored.Load())
+				assert.NotNil(t, f.err.Load())
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			f.Cancel(tt.args.interrupt, tt.args.err)
+			tt.verifier(t, f)
+		})
+	}
+}
+
+func Test_future_String(t *testing.T) {
+	tests := []struct {
+		name string
+		want string
+	}{
+		{
+			name: "string it",
+			want: "future{\n\tcancelRequested: false,\n\tinterruptRequested: false,\n\tcompleted: false,\n\terrored: false,\n\terr: <nil>,\n}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			assert.Equalf(t, tt.want, f.String(), "String()")
+		})
+	}
+}
+
+func Test_future_complete(t *testing.T) {
+	tests := []struct {
+		name     string
+		verifier func(*testing.T, *future)
+	}{
+		{
+			name: "complete completes",
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.completed.Load())
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			f.complete()
+			tt.verifier(t, f)
+		})
+	}
+}
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 85f3ef8064..d83e770dc9 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -21,97 +21,13 @@ package pool
 
 import (
 	"context"
-	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/utils"
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog"
 	"io"
-	"runtime/debug"
-	"sync"
-	"sync/atomic"
 	"time"
 )
 
 type Runnable func()
 
-type worker struct {
-	id          int
-	shutdown    atomic.Bool
-	interrupted atomic.Bool
-	interrupter chan struct{}
-	executor    interface {
-		isTraceWorkers() bool
-		getWorksItems() chan workItem
-		getWorkerWaitGroup() *sync.WaitGroup
-	}
-	hasEnded     atomic.Bool
-	lastReceived time.Time
-
-	log zerolog.Logger
-}
-
-func (w *worker) initialize() {
-	w.shutdown.Store(false)
-	w.interrupted.Store(false)
-	w.interrupter = make(chan struct{}, 1)
-	w.hasEnded.Store(false)
-	w.lastReceived = time.Now()
-}
-
-func (w *worker) work() {
-	w.executor.getWorkerWaitGroup().Add(1)
-	defer w.executor.getWorkerWaitGroup().Done()
-	defer func() {
-		if err := recover(); err != nil {
-			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-		}
-		if !w.shutdown.Load() {
-			// if we are not in shutdown we continue
-			w.work()
-		}
-	}()
-	workerLog := w.log.With().Int("Worker id", w.id).Logger()
-	if !w.executor.isTraceWorkers() {
-		workerLog = zerolog.Nop()
-	}
-	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
-	w.hasEnded.Store(false)
-	workerLog.Debug().Msgf("setting to not ended")
-
-	for !w.shutdown.Load() {
-		workerLog.Debug().Msg("Working")
-		select {
-		case _workItem := <-w.executor.getWorksItems():
-			w.lastReceived = time.Now()
-			workerLog.Debug().Msgf("Got work item %v", _workItem)
-			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
-				workerLog.Debug().Msg("We need to stop")
-				// TODO: do we need to complete with a error?
-			} else {
-				workerLog.Debug().Msgf("Running work item %v", _workItem)
-				_workItem.runnable()
-				_workItem.completionFuture.complete()
-				workerLog.Debug().Msgf("work item %v completed", _workItem)
-			}
-		case <-w.interrupter:
-			workerLog.Debug().Msg("We got interrupted")
-		}
-	}
-	w.hasEnded.Store(true)
-	workerLog.Debug().Msg("setting to ended")
-}
-
-type workItem struct {
-	workItemId       int32
-	runnable         Runnable
-	completionFuture *future
-}
-
-func (w workItem) String() string {
-	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
-}
-
 type Executor interface {
 	io.Closer
 	Start()
@@ -120,43 +36,6 @@ type Executor interface {
 	IsRunning() bool
 }
 
-type executor struct {
-	running      bool
-	shutdown     bool
-	stateChange  sync.Mutex
-	worker       []*worker
-	queueDepth   int
-	workItems    chan workItem
-	traceWorkers bool
-
-	workerWaitGroup sync.WaitGroup
-
-	log zerolog.Logger
-}
-
-func (e *executor) isTraceWorkers() bool {
-	return e.traceWorkers
-}
-
-func (e *executor) getWorksItems() chan workItem {
-	return e.workItems
-}
-
-func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
-	return &e.workerWaitGroup
-}
-
-type dynamicExecutor struct {
-	*executor
-
-	maxNumberOfWorkers     int
-	currentNumberOfWorkers atomic.Int32
-	dynamicStateChange     sync.Mutex
-	interrupter            chan struct{}
-
-	dynamicWorkers sync.WaitGroup
-}
-
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	workers := make([]*worker, numberOfWorkers)
 	customLogger := options.ExtractCustomLogger(_options...)
@@ -184,10 +63,6 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 	return _executor
 }
 
-var upScaleInterval = 100 * time.Millisecond
-var downScaleInterval = 5 * time.Second
-var timeToBecomeUnused = 5 * time.Second
-
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLogger(_options...)
 	_executor := &dynamicExecutor{
@@ -219,265 +94,19 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
 	return &tracerWorkersOption{traceWorkers: traceWorkers}
 }
 
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
 type tracerWorkersOption struct {
 	options.Option
 	traceWorkers bool
 }
 
-func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
-	if runnable == nil {
-		value := atomic.Value{}
-		value.Store(errors.New("runnable must not be nil"))
-		return &future{err: value}
-	}
-	e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
-	completionFuture := &future{}
-	if e.shutdown {
-		completionFuture.Cancel(false, errors.New("executor in shutdown"))
-		return completionFuture
-	}
-	select {
-	case e.workItems <- workItem{
-		workItemId:       workItemId,
-		runnable:         runnable,
-		completionFuture: completionFuture,
-	}:
-		e.log.Trace().Msg("Item added")
-	case <-ctx.Done():
-		completionFuture.Cancel(false, ctx.Err())
-	}
-
-	e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
-	return completionFuture
-}
-
-func (e *executor) Start() {
-	e.stateChange.Lock()
-	defer e.stateChange.Unlock()
-	if e.running || e.shutdown {
-		e.log.Warn().Msg("Already started")
-		return
-	}
-	e.running = true
-	e.shutdown = false
-	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.initialize()
-		go worker.work()
-	}
-}
-
-func (e *dynamicExecutor) Start() {
-	e.dynamicStateChange.Lock()
-	defer e.dynamicStateChange.Unlock()
-	if e.running || e.shutdown {
-		e.log.Warn().Msg("Already started")
-		return
-	}
-	if e.interrupter != nil {
-		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
-		close(e.interrupter)
-		e.dynamicWorkers.Wait()
-	}
-
-	e.executor.Start()
-	mutex := sync.Mutex{}
-	e.interrupter = make(chan struct{})
-	// Worker spawner
-	go func() {
-		e.dynamicWorkers.Add(1)
-		defer e.dynamicWorkers.Done()
-		defer func() {
-			if err := recover(); err != nil {
-				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-			}
-		}()
-		workerLog := e.log.With().Str("Worker type", "spawner").Logger()
-		if !e.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for e.running && !e.shutdown {
-			workerLog.Trace().Msg("running")
-			mutex.Lock()
-			numberOfItemsInQueue := len(e.workItems)
-			numberOfWorkers := len(e.worker)
-			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
-			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
-				workerLog.Trace().Msg("spawning new worker")
-				_worker := &worker{
-					id:           numberOfWorkers - 1,
-					interrupter:  make(chan struct{}, 1),
-					executor:     e,
-					lastReceived: time.Now(),
-					log:          e.log,
-				}
-				e.worker = append(e.worker, _worker)
-				_worker.initialize()
-				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
-				go _worker.work()
-				e.currentNumberOfWorkers.Add(1)
-			} else {
-				workerLog.Trace().Msg("Nothing to scale")
-			}
-			mutex.Unlock()
-			func() {
-				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
-				timer := time.NewTimer(upScaleInterval)
-				defer utils.CleanupTimer(timer)
-				select {
-				case <-timer.C:
-				case <-e.interrupter:
-					workerLog.Info().Msg("interrupted")
-				}
-			}()
-		}
-		workerLog.Info().Msg("Terminated")
-	}()
-	// Worker killer
-	go func() {
-		e.dynamicWorkers.Add(1)
-		defer e.dynamicWorkers.Done()
-		defer func() {
-			if err := recover(); err != nil {
-				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-			}
-		}()
-		workerLog := e.log.With().Str("Worker type", "killer").Logger()
-		if !e.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for e.running && !e.shutdown {
-			workerLog.Trace().Msg("running")
-			mutex.Lock()
-			newWorkers := make([]*worker, 0)
-			for _, _worker := range e.worker {
-				deadline := time.Now().Add(-timeToBecomeUnused)
-				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
-				if _worker.lastReceived.Before(deadline) {
-					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
-					_worker.interrupted.Store(true)
-					close(_worker.interrupter)
-					e.currentNumberOfWorkers.Add(-1)
-				} else {
-					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
-					newWorkers = append(newWorkers, _worker)
-				}
-			}
-			e.worker = newWorkers
-			mutex.Unlock()
-			func() {
-				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
-				timer := time.NewTimer(downScaleInterval)
-				defer utils.CleanupTimer(timer)
-				select {
-				case <-timer.C:
-				case <-e.interrupter:
-					workerLog.Info().Msg("interrupted")
-				}
-			}()
-		}
-		workerLog.Info().Msg("Terminated")
-	}()
-}
-
-func (e *executor) Stop() {
-	e.log.Trace().Msg("stopping now")
-	e.stateChange.Lock()
-	defer e.stateChange.Unlock()
-	if !e.running || e.shutdown {
-		e.log.Warn().Msg("already stopped")
-		return
-	}
-	e.shutdown = true
-	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.shutdown.Store(true)
-		worker.interrupted.Store(true)
-		close(worker.interrupter)
-	}
-	e.running = false
-	e.shutdown = false
-	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
-	e.workerWaitGroup.Wait()
-	e.log.Trace().Msg("stopped")
-}
-
-func (e *dynamicExecutor) Stop() {
-	e.log.Trace().Msg("stopping now")
-	e.dynamicStateChange.Lock()
-	defer e.dynamicStateChange.Unlock()
-	if !e.running || e.shutdown {
-		e.log.Warn().Msg("already stopped")
-		return
-	}
-	close(e.interrupter)
-	e.log.Trace().Msg("stopping inner executor")
-	e.executor.Stop()
-	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
-	e.dynamicWorkers.Wait()
-	e.log.Trace().Msg("stopped")
-}
-
-func (e *executor) Close() error {
-	e.Stop()
-	return nil
-}
-
-func (e *executor) IsRunning() bool {
-	return e.running && !e.shutdown
-}
-
-type CompletionFuture interface {
-	AwaitCompletion(ctx context.Context) error
-	Cancel(interrupt bool, err error)
-}
-
-type future struct {
-	cancelRequested    atomic.Bool
-	interruptRequested atomic.Bool
-	completed          atomic.Bool
-	errored            atomic.Bool
-	err                atomic.Value
-}
-
-func (f *future) Cancel(interrupt bool, err error) {
-	f.cancelRequested.Store(true)
-	f.interruptRequested.Store(interrupt)
-	if err != nil {
-		f.errored.Store(true)
-		f.err.Store(err)
-	}
-}
-
-func (f *future) complete() {
-	f.completed.Store(true)
-}
-
-// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
-var Canceled = errors.New("Canceled")
-
-func (f *future) AwaitCompletion(ctx context.Context) error {
-	for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
-		time.Sleep(time.Millisecond * 10)
-	}
-	if err := ctx.Err(); err != nil {
-		return err
-	}
-	if err, ok := f.err.Load().(error); ok {
-		return err
-	}
-	if f.cancelRequested.Load() {
-		return Canceled
-	}
-	return nil
-}
-
-func (f *future) String() string {
-	return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
-		f.cancelRequested.Load(),
-		f.interruptRequested.Load(),
-		f.completed.Load(),
-		f.errored.Load(),
-		f.err.Load(),
-	)
-}
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 9a62a09d25..b531d94e7e 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -21,9 +21,7 @@ package pool
 
 import (
 	"context"
-	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"math/rand"
@@ -32,212 +30,6 @@ import (
 	"time"
 )
 
-func TestExecutor_Start(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		shouldRun bool
-	}{
-		{
-			name:      "Start fresh",
-			shouldRun: true,
-		},
-		{
-			name: "Start running",
-			fields: fields{
-				running: true,
-			},
-			shouldRun: true,
-		},
-		{
-			name: "Start stopping",
-			fields: fields{
-				running:  true,
-				shutdown: true,
-			},
-			shouldRun: false,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Start()
-			assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
-		})
-	}
-}
-
-func TestExecutor_Stop(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		shouldRun bool
-	}{
-		{
-			name:      "Stop stopped",
-			shouldRun: false,
-		},
-		{
-			name: "Stop running",
-			fields: fields{
-				running: true,
-				queue:   make(chan workItem),
-				worker: []*worker{
-					func() *worker {
-						w := &worker{}
-						w.initialize()
-						return w
-					}(),
-				},
-			},
-			shouldRun: false,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Stop()
-		})
-	}
-}
-
-func TestExecutor_Submit(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	type args struct {
-		workItemId int32
-		runnable   Runnable
-		context    context.Context
-	}
-	tests := []struct {
-		name                      string
-		fields                    fields
-		args                      args
-		completionFutureValidator func(t *testing.T, future CompletionFuture) bool
-		waitForCompletion         bool
-	}{
-		{
-			name: "submitting nothing",
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				return assert.Error(t, completionFuture.(*future).err.Load().(error))
-			},
-		},
-		{
-			name: "submit canceled",
-			fields: fields{
-				queue: make(chan workItem, 0),
-			},
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// We do something for 3 seconds
-					<-time.NewTimer(3 * time.Second).C
-				},
-				context: func() context.Context {
-					ctx, cancelFunc := context.WithCancel(context.Background())
-					cancelFunc()
-					return ctx
-				}(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				err := completionFuture.(*future).err.Load().(error)
-				return assert.Error(t, err)
-			},
-		},
-		{
-			name: "Submit something which doesn't complete",
-			fields: fields{
-				queue: make(chan workItem, 1),
-			},
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// We do something for 3 seconds
-					<-time.NewTimer(3 * time.Second).C
-				},
-				context: context.TODO(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				completed := completionFuture.(*future).completed.Load()
-				return assert.False(t, completed)
-			},
-		},
-		{
-			name: "Submit something which does complete",
-			fields: func() fields {
-				var executor = NewFixedSizeExecutor(1, 1).(*executor)
-				return fields{
-					running:      executor.running,
-					shutdown:     executor.shutdown,
-					worker:       executor.worker,
-					queue:        executor.workItems,
-					traceWorkers: true,
-				}
-			}(),
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// NOOP
-				},
-				context: context.TODO(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				completed := completionFuture.(*future).completed.Load()
-				return assert.True(t, completed)
-			},
-			waitForCompletion: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Start()
-			completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
-			if tt.waitForCompletion {
-				assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
-			}
-			assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
-		})
-	}
-}
-
 func TestNewFixedSizeExecutor(t *testing.T) {
 	type args struct {
 		numberOfWorkers int
@@ -407,487 +199,6 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
 	}
 }
 
-func TestWorkItem_String(t *testing.T) {
-	type fields struct {
-		workItemId       int32
-		runnable         Runnable
-		completionFuture *future
-	}
-	tests := []struct {
-		name   string
-		fields fields
-		want   string
-	}{
-		{
-			name: "Simple test",
-			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			w := &workItem{
-				workItemId:       tt.fields.workItemId,
-				runnable:         tt.fields.runnable,
-				completionFuture: tt.fields.completionFuture,
-			}
-			assert.Equalf(t, tt.want, w.String(), "String()")
-		})
-	}
-}
-
-func TestWorker_work(t *testing.T) {
-	type fields struct {
-		id       int
-		executor *executor
-	}
-	tests := []struct {
-		name                       string
-		fields                     fields
-		timeBeforeFirstValidation  time.Duration
-		firstValidation            func(*testing.T, *worker)
-		timeBeforeManipulation     time.Duration
-		manipulator                func(*worker)
-		timeBeforeSecondValidation time.Duration
-		secondValidation           func(*testing.T, *worker)
-	}{
-		{
-			name: "Worker should work till shutdown (even if it panics)",
-			fields: fields{
-				id: 0,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								panic("Oh no what should I do???")
-							},
-							completionFuture: &future{},
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Worker should work till shutdown",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								time.Sleep(time.Millisecond * 70)
-							},
-							completionFuture: &future{},
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Work interrupted",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Work on canceled",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						completionFuture := &future{}
-						completionFuture.cancelRequested.Store(true)
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								time.Sleep(time.Millisecond * 70)
-							},
-							completionFuture: completionFuture,
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeManipulation: 50 * time.Millisecond,
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			w := &worker{
-				id:          tt.fields.id,
-				interrupter: make(chan struct{}, 1),
-				executor:    tt.fields.executor,
-			}
-			go w.work()
-			if tt.firstValidation != nil {
-				time.Sleep(tt.timeBeforeFirstValidation)
-				t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
-				tt.firstValidation(t, w)
-			}
-			if tt.manipulator != nil {
-				time.Sleep(tt.timeBeforeManipulation)
-				t.Logf("manipulator after %v", tt.timeBeforeManipulation)
-				tt.manipulator(w)
-			}
-			if tt.secondValidation != nil {
-				time.Sleep(tt.timeBeforeSecondValidation)
-				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
-				tt.secondValidation(t, w)
-			}
-		})
-	}
-}
-
-func Test_future_AwaitCompletion(t *testing.T) {
-	type args struct {
-		ctx context.Context
-	}
-	tests := []struct {
-		name      string
-		args      args
-		completer func(*future)
-		wantErr   assert.ErrorAssertionFunc
-	}{
-		{
-			name: "completes with error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				f.Cancel(false, errors.New("Uh oh"))
-			},
-			wantErr: assert.Error,
-		},
-		{
-			name: "completes regular",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 30)
-				f.complete()
-			},
-			wantErr: assert.NoError,
-		},
-		{
-			name: "completes not int time",
-			args: args{ctx: func() context.Context {
-				deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
-				t.Cleanup(cancel)
-				return deadline
-			}()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-			},
-			wantErr: assert.Error,
-		},
-		{
-			name: "completes canceled without error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-				f.Cancel(true, nil)
-			},
-			wantErr: func(t assert.TestingT, err error, i ...any) bool {
-				assert.Same(t, Canceled, err)
-				return true
-			},
-		},
-		{
-			name: "completes canceled with particular error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-				f.Cancel(true, errors.New("Uh oh"))
-			},
-			wantErr: func(t assert.TestingT, err error, i ...any) bool {
-				assert.Equal(t, "Uh oh", err.Error())
-				return true
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			go tt.completer(f)
-			tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
-		})
-	}
-}
-
-func Test_future_Cancel(t *testing.T) {
-	type args struct {
-		interrupt bool
-		err       error
-	}
-	tests := []struct {
-		name     string
-		args     args
-		verifier func(*testing.T, *future)
-	}{
-		{
-			name: "cancel cancels",
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-			},
-		},
-		{
-			name: "cancel with interrupt",
-			args: args{
-				interrupt: true,
-				err:       nil,
-			},
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-				assert.False(t, f.errored.Load())
-				assert.Nil(t, f.err.Load())
-			},
-		},
-		{
-			name: "cancel with err",
-			args: args{
-				interrupt: true,
-				err:       errors.New("Uh Oh"),
-			},
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-				assert.True(t, f.errored.Load())
-				assert.NotNil(t, f.err.Load())
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			f.Cancel(tt.args.interrupt, tt.args.err)
-			tt.verifier(t, f)
-		})
-	}
-}
-
-func Test_future_complete(t *testing.T) {
-	tests := []struct {
-		name     string
-		verifier func(*testing.T, *future)
-	}{
-		{
-			name: "complete completes",
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.completed.Load())
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			f.complete()
-			tt.verifier(t, f)
-		})
-	}
-}
-
-func Test_dynamicExecutor_Start(t *testing.T) {
-	type fields struct {
-		executor           *executor
-		maxNumberOfWorkers int
-	}
-	tests := []struct {
-		name       string
-		fields     fields
-		setup      func(t *testing.T, fields *fields)
-		startTwice bool
-	}{
-		{
-			name: "just start",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "start twice",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-			startTwice: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if tt.setup != nil {
-				tt.setup(t, &tt.fields)
-			}
-			e := &dynamicExecutor{
-				executor:           tt.fields.executor,
-				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
-			}
-			e.Start()
-			if tt.startTwice {
-				e.Start()
-			}
-			// Let it work a bit
-			time.Sleep(20 * time.Millisecond)
-			t.Log("done with test")
-			t.Cleanup(e.Stop)
-		})
-	}
-}
-
-func Test_dynamicExecutor_Stop(t *testing.T) {
-	type fields struct {
-		executor           *executor
-		maxNumberOfWorkers int
-		interrupter        chan struct{}
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		setup     func(t *testing.T, fields *fields)
-		startIt   bool
-		stopTwice bool
-	}{
-		{
-			name: "just stop",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "stop started",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "stop twice",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-			stopTwice: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if tt.setup != nil {
-				tt.setup(t, &tt.fields)
-			}
-			e := &dynamicExecutor{
-				executor:           tt.fields.executor,
-				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
-				interrupter:        tt.fields.interrupter,
-			}
-			if tt.startIt {
-				e.Start()
-			}
-			e.Stop()
-			if tt.stopTwice {
-				e.Stop()
-			}
-		})
-	}
-}
-
 // from: https://github.com/golang/go/issues/36532#issuecomment-575535452
 func testContext(t *testing.T) context.Context {
 	ctx, cancel := context.WithCancel(context.Background())
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
new file mode 100644
index 0000000000..22f8d20e5b
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"github.com/rs/zerolog"
+	"runtime/debug"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
+type dynamicExecutor struct {
+	*executor
+
+	maxNumberOfWorkers     int
+	currentNumberOfWorkers atomic.Int32
+	dynamicStateChange     sync.Mutex
+	interrupter            chan struct{}
+
+	dynamicWorkers sync.WaitGroup
+}
+
+func (e *dynamicExecutor) Start() {
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	if e.interrupter != nil {
+		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+		close(e.interrupter)
+		e.dynamicWorkers.Wait()
+	}
+
+	e.executor.Start()
+	mutex := sync.Mutex{}
+	e.interrupter = make(chan struct{})
+	// Worker spawner
+	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Trace().Msg("running")
+			mutex.Lock()
+			numberOfItemsInQueue := len(e.workItems)
+			numberOfWorkers := len(e.worker)
+			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+				workerLog.Trace().Msg("spawning new worker")
+				_worker := &worker{
+					id:           numberOfWorkers - 1,
+					interrupter:  make(chan struct{}, 1),
+					executor:     e,
+					lastReceived: time.Now(),
+					log:          e.log,
+				}
+				e.worker = append(e.worker, _worker)
+				_worker.initialize()
+				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+				go _worker.work()
+				e.currentNumberOfWorkers.Add(1)
+			} else {
+				workerLog.Trace().Msg("Nothing to scale")
+			}
+			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+				timer := time.NewTimer(upScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
+		}
+		workerLog.Info().Msg("Terminated")
+	}()
+	// Worker killer
+	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "killer").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Trace().Msg("running")
+			mutex.Lock()
+			newWorkers := make([]*worker, 0)
+			for _, _worker := range e.worker {
+				deadline := time.Now().Add(-timeToBecomeUnused)
+				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+				if _worker.lastReceived.Before(deadline) {
+					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+					_worker.interrupted.Store(true)
+					close(_worker.interrupter)
+					e.currentNumberOfWorkers.Add(-1)
+				} else {
+					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+					newWorkers = append(newWorkers, _worker)
+				}
+			}
+			e.worker = newWorkers
+			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+				timer := time.NewTimer(downScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
+		}
+		workerLog.Info().Msg("Terminated")
+	}()
+}
+
+func (e *dynamicExecutor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	close(e.interrupter)
+	e.log.Trace().Msg("stopping inner executor")
+	e.executor.Stop()
+	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+	e.dynamicWorkers.Wait()
+	e.log.Trace().Msg("stopped")
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
new file mode 100644
index 0000000000..1db92a5d7f
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"testing"
+	"time"
+)
+
+func Test_dynamicExecutor_Start(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+	}
+	tests := []struct {
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		startTwice bool
+	}{
+		{
+			name: "just start",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "start twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			startTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+			}
+			e.Start()
+			if tt.startTwice {
+				e.Start()
+			}
+			// Let it work a bit
+			time.Sleep(20 * time.Millisecond)
+			t.Log("done with test")
+			t.Cleanup(e.Stop)
+		})
+	}
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+		interrupter        chan struct{}
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		setup     func(t *testing.T, fields *fields)
+		startIt   bool
+		stopTwice bool
+	}{
+		{
+			name: "just stop",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			stopTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+				interrupter:        tt.fields.interrupter,
+			}
+			if tt.startIt {
+				e.Start()
+			}
+			e.Stop()
+			if tt.stopTwice {
+				e.Stop()
+			}
+		})
+	}
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
new file mode 100644
index 0000000000..79bddb6dc0
--- /dev/null
+++ b/plc4go/spi/pool/executor.go
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"sync"
+	"sync/atomic"
+
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
+)
+
+type executor struct {
+	running      bool
+	shutdown     bool
+	stateChange  sync.Mutex
+	worker       []*worker
+	queueDepth   int
+	workItems    chan workItem
+	traceWorkers bool
+
+	workerWaitGroup sync.WaitGroup
+
+	log zerolog.Logger
+}
+
+func (e *executor) isTraceWorkers() bool {
+	return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+	return e.workItems
+}
+
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+	return &e.workerWaitGroup
+}
+
+func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
+	if runnable == nil {
+		value := atomic.Value{}
+		value.Store(errors.New("runnable must not be nil"))
+		return &future{err: value}
+	}
+	e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+	completionFuture := &future{}
+	if e.shutdown {
+		completionFuture.Cancel(false, errors.New("executor in shutdown"))
+		return completionFuture
+	}
+	select {
+	case e.workItems <- workItem{
+		workItemId:       workItemId,
+		runnable:         runnable,
+		completionFuture: completionFuture,
+	}:
+		e.log.Trace().Msg("Item added")
+	case <-ctx.Done():
+		completionFuture.Cancel(false, ctx.Err())
+	}
+
+	e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+	return completionFuture
+}
+
+func (e *executor) Start() {
+	e.stateChange.Lock()
+	defer e.stateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	e.running = true
+	e.shutdown = false
+	for i := 0; i < len(e.worker); i++ {
+		worker := e.worker[i]
+		worker.initialize()
+		go worker.work()
+	}
+}
+
+func (e *executor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.stateChange.Lock()
+	defer e.stateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	e.shutdown = true
+	for i := 0; i < len(e.worker); i++ {
+		worker := e.worker[i]
+		worker.shutdown.Store(true)
+		worker.interrupted.Store(true)
+		close(worker.interrupter)
+	}
+	e.running = false
+	e.shutdown = false
+	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+	e.workerWaitGroup.Wait()
+	e.log.Trace().Msg("stopped")
+}
+
+func (e *executor) Close() error {
+	e.Stop()
+	return nil
+}
+
+func (e *executor) IsRunning() bool {
+	return e.running && !e.shutdown
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
new file mode 100644
index 0000000000..2a43b1366e
--- /dev/null
+++ b/plc4go/spi/pool/executor_test.go
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
+	"sync"
+	"testing"
+	"time"
+)
+
+func Test_executor_Close(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name:    "close it",
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			tt.wantErr(t, e.Close(), fmt.Sprintf("Close()"))
+		})
+	}
+}
+
+func Test_executor_IsRunning(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   bool
+	}{
+		{
+			name: "no",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
+		})
+	}
+}
+
+func Test_executor_Start(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		shouldRun bool
+	}{
+		{
+			name:      "Start fresh",
+			shouldRun: true,
+		},
+		{
+			name: "Start running",
+			fields: fields{
+				running: true,
+			},
+			shouldRun: true,
+		},
+		{
+			name: "Start stopping",
+			fields: fields{
+				running:  true,
+				shutdown: true,
+			},
+			shouldRun: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Start()
+			assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
+		})
+	}
+}
+
+func Test_executor_Stop(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		shouldRun bool
+	}{
+		{
+			name:      "Stop stopped",
+			shouldRun: false,
+		},
+		{
+			name: "Stop running",
+			fields: fields{
+				running: true,
+				queue:   make(chan workItem),
+				worker: []*worker{
+					func() *worker {
+						w := &worker{}
+						w.initialize()
+						return w
+					}(),
+				},
+			},
+			shouldRun: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Stop()
+		})
+	}
+}
+
+func Test_executor_Submit(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	type args struct {
+		workItemId int32
+		runnable   Runnable
+		context    context.Context
+	}
+	tests := []struct {
+		name                      string
+		fields                    fields
+		args                      args
+		completionFutureValidator func(t *testing.T, future CompletionFuture) bool
+		waitForCompletion         bool
+	}{
+		{
+			name: "submitting nothing",
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				return assert.Error(t, completionFuture.(*future).err.Load().(error))
+			},
+		},
+		{
+			name: "submit canceled",
+			fields: fields{
+				queue: make(chan workItem, 0),
+			},
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// We do something for 3 seconds
+					<-time.NewTimer(3 * time.Second).C
+				},
+				context: func() context.Context {
+					ctx, cancelFunc := context.WithCancel(context.Background())
+					cancelFunc()
+					return ctx
+				}(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				err := completionFuture.(*future).err.Load().(error)
+				return assert.Error(t, err)
+			},
+		},
+		{
+			name: "Submit something which doesn't complete",
+			fields: fields{
+				queue: make(chan workItem, 1),
+			},
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// We do something for 3 seconds
+					<-time.NewTimer(3 * time.Second).C
+				},
+				context: context.TODO(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				completed := completionFuture.(*future).completed.Load()
+				return assert.False(t, completed)
+			},
+		},
+		{
+			name: "Submit something which does complete",
+			fields: func() fields {
+				var executor = NewFixedSizeExecutor(1, 1).(*executor)
+				return fields{
+					running:      executor.running,
+					shutdown:     executor.shutdown,
+					worker:       executor.worker,
+					queue:        executor.workItems,
+					traceWorkers: true,
+				}
+			}(),
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// NOOP
+				},
+				context: context.TODO(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				completed := completionFuture.(*future).completed.Load()
+				return assert.True(t, completed)
+			},
+			waitForCompletion: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Start()
+			completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
+			if tt.waitForCompletion {
+				assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
+			}
+			assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+		})
+	}
+}
+
+func Test_executor_getWorkerWaitGroup(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   *sync.WaitGroup
+	}{
+		{
+			name: "get it",
+			want: &sync.WaitGroup{},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.getWorkerWaitGroup(), "getWorkerWaitGroup()")
+		})
+	}
+}
+
+func Test_executor_getWorksItems(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   chan workItem
+	}{
+		{
+			name: "get it",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.getWorksItems(), "getWorksItems()")
+		})
+	}
+}
+
+func Test_executor_isTraceWorkers(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   bool
+	}{
+		{
+			name: "it is not",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.isTraceWorkers(), "isTraceWorkers()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
new file mode 100644
index 0000000000..942480bc1a
--- /dev/null
+++ b/plc4go/spi/pool/workItem.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import "fmt"
+
+type workItem struct {
+	workItemId       int32
+	runnable         Runnable
+	completionFuture *future
+}
+
+func (w workItem) String() string {
+	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+}
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
new file mode 100644
index 0000000000..ebc4b30c5c
--- /dev/null
+++ b/plc4go/spi/pool/workItem_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func Test_workItem_String(t *testing.T) {
+	type fields struct {
+		workItemId       int32
+		runnable         Runnable
+		completionFuture *future
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "Simple test",
+			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &workItem{
+				workItemId:       tt.fields.workItemId,
+				runnable:         tt.fields.runnable,
+				completionFuture: tt.fields.completionFuture,
+			}
+			assert.Equalf(t, tt.want, w.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
new file mode 100644
index 0000000000..7530ed0ad9
--- /dev/null
+++ b/plc4go/spi/pool/worker.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"runtime/debug"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/rs/zerolog"
+)
+
+type worker struct {
+	id          int
+	shutdown    atomic.Bool
+	interrupted atomic.Bool
+	interrupter chan struct{}
+	executor    interface {
+		isTraceWorkers() bool
+		getWorksItems() chan workItem
+		getWorkerWaitGroup() *sync.WaitGroup
+	}
+	hasEnded     atomic.Bool
+	lastReceived time.Time
+
+	log zerolog.Logger
+}
+
+func (w *worker) initialize() {
+	w.shutdown.Store(false)
+	w.interrupted.Store(false)
+	w.interrupter = make(chan struct{}, 1)
+	w.hasEnded.Store(false)
+	w.lastReceived = time.Now()
+}
+
+func (w *worker) work() {
+	w.executor.getWorkerWaitGroup().Add(1)
+	defer w.executor.getWorkerWaitGroup().Done()
+	defer func() {
+		if err := recover(); err != nil {
+			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+		}
+		if !w.shutdown.Load() {
+			// if we are not in shutdown we continue
+			w.work()
+		}
+	}()
+	workerLog := w.log.With().Int("Worker id", w.id).Logger()
+	if !w.executor.isTraceWorkers() {
+		workerLog = zerolog.Nop()
+	}
+	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+	w.hasEnded.Store(false)
+	workerLog.Debug().Msgf("setting to not ended")
+
+	for !w.shutdown.Load() {
+		workerLog.Debug().Msg("Working")
+		select {
+		case _workItem := <-w.executor.getWorksItems():
+			w.lastReceived = time.Now()
+			workerLog.Debug().Msgf("Got work item %v", _workItem)
+			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
+				workerLog.Debug().Msg("We need to stop")
+				// TODO: do we need to complete with a error?
+			} else {
+				workerLog.Debug().Msgf("Running work item %v", _workItem)
+				_workItem.runnable()
+				_workItem.completionFuture.complete()
+				workerLog.Debug().Msgf("work item %v completed", _workItem)
+			}
+		case <-w.interrupter:
+			workerLog.Debug().Msg("We got interrupted")
+		}
+	}
+	w.hasEnded.Store(true)
+	workerLog.Debug().Msg("setting to ended")
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
new file mode 100644
index 0000000000..4f604d2efb
--- /dev/null
+++ b/plc4go/spi/pool/worker_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
+)
+
+func Test_worker_initialize(t *testing.T) {
+	type fields struct {
+		id          int
+		interrupter chan struct{}
+		executor    interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived time.Time
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+	}{
+		{
+			name: "do it",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				interrupter:  tt.fields.interrupter,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				log:          tt.fields.log,
+			}
+			w.initialize()
+		})
+	}
+}
+
+func Test_worker_work(t *testing.T) {
+	type fields struct {
+		id       int
+		executor *executor
+	}
+	tests := []struct {
+		name                       string
+		fields                     fields
+		timeBeforeFirstValidation  time.Duration
+		firstValidation            func(*testing.T, *worker)
+		timeBeforeManipulation     time.Duration
+		manipulator                func(*worker)
+		timeBeforeSecondValidation time.Duration
+		secondValidation           func(*testing.T, *worker)
+	}{
+		{
+			name: "Worker should work till shutdown (even if it panics)",
+			fields: fields{
+				id: 0,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								panic("Oh no what should I do???")
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Worker should work till shutdown",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								time.Sleep(time.Millisecond * 70)
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work interrupted",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work on canceled",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						completionFuture := &future{}
+						completionFuture.cancelRequested.Store(true)
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								time.Sleep(time.Millisecond * 70)
+							},
+							completionFuture: completionFuture,
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeManipulation: 50 * time.Millisecond,
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:          tt.fields.id,
+				interrupter: make(chan struct{}, 1),
+				executor:    tt.fields.executor,
+			}
+			go w.work()
+			if tt.firstValidation != nil {
+				time.Sleep(tt.timeBeforeFirstValidation)
+				t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+				tt.firstValidation(t, w)
+			}
+			if tt.manipulator != nil {
+				time.Sleep(tt.timeBeforeManipulation)
+				t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+				tt.manipulator(w)
+			}
+			if tt.secondValidation != nil {
+				time.Sleep(tt.timeBeforeSecondValidation)
+				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+				tt.secondValidation(t, w)
+			}
+		})
+	}
+}