You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/13 12:17:23 UTC

[plc4x] 02/04: refactor(plc4go/spi): split up pool into multiple files

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ade510700aab3ef7200064501bd2f01152919aa9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:44:07 2023 +0200

    refactor(plc4go/spi): split up pool into multiple files
---
 plc4go/spi/pool/CompletionFuture.go      |  90 ++++
 plc4go/spi/pool/CompletionFuture_test.go | 191 +++++++++
 plc4go/spi/pool/WorkerPool.go            | 393 +-----------------
 plc4go/spi/pool/WorkerPool_test.go       | 689 -------------------------------
 plc4go/spi/pool/dynamicExecutor.go       | 173 ++++++++
 plc4go/spi/pool/dynamicExecutor_test.go  | 170 ++++++++
 plc4go/spi/pool/executor.go              | 129 ++++++
 plc4go/spi/pool/executor_test.go         | 413 ++++++++++++++++++
 plc4go/spi/pool/workItem.go              |  32 ++
 plc4go/spi/pool/workItem_test.go         |  53 +++
 plc4go/spi/pool/worker.go                |  96 +++++
 plc4go/spi/pool/worker_test.go           | 227 ++++++++++
 12 files changed, 1585 insertions(+), 1071 deletions(-)

diff --git a/plc4go/spi/pool/CompletionFuture.go b/plc4go/spi/pool/CompletionFuture.go
new file mode 100644
index 0000000000..5d11002356
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"sync/atomic"
+	"time"
+
+	"github.com/pkg/errors"
+)
+
+type CompletionFuture interface {
+	AwaitCompletion(ctx context.Context) error
+	Cancel(interrupt bool, err error)
+}
+
+type future struct {
+	cancelRequested    atomic.Bool
+	interruptRequested atomic.Bool
+	completed          atomic.Bool
+	errored            atomic.Bool
+	err                atomic.Value
+}
+
+func (f *future) Cancel(interrupt bool, err error) {
+	f.cancelRequested.Store(true)
+	f.interruptRequested.Store(interrupt)
+	if err != nil {
+		f.errored.Store(true)
+		f.err.Store(err)
+	}
+}
+
+func (f *future) complete() {
+	f.completed.Store(true)
+}
+
+// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
+var Canceled = errors.New("Canceled")
+
+func (f *future) AwaitCompletion(ctx context.Context) error {
+	for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
+		time.Sleep(time.Millisecond * 10)
+	}
+	if err := ctx.Err(); err != nil {
+		return err
+	}
+	if err, ok := f.err.Load().(error); ok {
+		return err
+	}
+	if f.cancelRequested.Load() {
+		return Canceled
+	}
+	return nil
+}
+
+func (f *future) String() string {
+	return fmt.Sprintf("future{\n"+
+		"\tcancelRequested: %t,\n"+
+		"\tinterruptRequested: %t,\n"+
+		"\tcompleted: %t,\n"+
+		"\terrored: %t,\n"+
+		"\terr: %v,\n"+
+		"}",
+		f.cancelRequested.Load(),
+		f.interruptRequested.Load(),
+		f.completed.Load(),
+		f.errored.Load(),
+		f.err.Load(),
+	)
+}
diff --git a/plc4go/spi/pool/CompletionFuture_test.go b/plc4go/spi/pool/CompletionFuture_test.go
new file mode 100644
index 0000000000..79fdaeace7
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture_test.go
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	"time"
+)
+
+func Test_future_AwaitCompletion(t *testing.T) {
+	type args struct {
+		ctx context.Context
+	}
+	tests := []struct {
+		name      string
+		args      args
+		completer func(*future)
+		wantErr   assert.ErrorAssertionFunc
+	}{
+		{
+			name: "completes with error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				f.Cancel(false, errors.New("Uh oh"))
+			},
+			wantErr: assert.Error,
+		},
+		{
+			name: "completes regular",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 30)
+				f.complete()
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "completes not int time",
+			args: args{ctx: func() context.Context {
+				deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
+				t.Cleanup(cancel)
+				return deadline
+			}()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+			},
+			wantErr: assert.Error,
+		},
+		{
+			name: "completes canceled without error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+				f.Cancel(true, nil)
+			},
+			wantErr: func(t assert.TestingT, err error, i ...any) bool {
+				assert.Same(t, Canceled, err)
+				return true
+			},
+		},
+		{
+			name: "completes canceled with particular error",
+			args: args{ctx: context.TODO()},
+			completer: func(f *future) {
+				time.Sleep(time.Millisecond * 300)
+				f.Cancel(true, errors.New("Uh oh"))
+			},
+			wantErr: func(t assert.TestingT, err error, i ...any) bool {
+				assert.Equal(t, "Uh oh", err.Error())
+				return true
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			go tt.completer(f)
+			tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
+		})
+	}
+}
+
+func Test_future_Cancel(t *testing.T) {
+	type args struct {
+		interrupt bool
+		err       error
+	}
+	tests := []struct {
+		name     string
+		args     args
+		verifier func(*testing.T, *future)
+	}{
+		{
+			name: "cancel cancels",
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+			},
+		},
+		{
+			name: "cancel with interrupt",
+			args: args{
+				interrupt: true,
+				err:       nil,
+			},
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+				assert.False(t, f.errored.Load())
+				assert.Nil(t, f.err.Load())
+			},
+		},
+		{
+			name: "cancel with err",
+			args: args{
+				interrupt: true,
+				err:       errors.New("Uh Oh"),
+			},
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.cancelRequested.Load())
+				assert.True(t, f.errored.Load())
+				assert.NotNil(t, f.err.Load())
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			f.Cancel(tt.args.interrupt, tt.args.err)
+			tt.verifier(t, f)
+		})
+	}
+}
+
+func Test_future_String(t *testing.T) {
+	tests := []struct {
+		name string
+		want string
+	}{
+		{
+			name: "string it",
+			want: "future{\n\tcancelRequested: false,\n\tinterruptRequested: false,\n\tcompleted: false,\n\terrored: false,\n\terr: <nil>,\n}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			assert.Equalf(t, tt.want, f.String(), "String()")
+		})
+	}
+}
+
+func Test_future_complete(t *testing.T) {
+	tests := []struct {
+		name     string
+		verifier func(*testing.T, *future)
+	}{
+		{
+			name: "complete completes",
+			verifier: func(t *testing.T, f *future) {
+				assert.True(t, f.completed.Load())
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			f := &future{}
+			f.complete()
+			tt.verifier(t, f)
+		})
+	}
+}
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 85f3ef8064..d83e770dc9 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -21,97 +21,13 @@ package pool
 
 import (
 	"context"
-	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/apache/plc4x/plc4go/spi/utils"
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog"
 	"io"
-	"runtime/debug"
-	"sync"
-	"sync/atomic"
 	"time"
 )
 
 type Runnable func()
 
-type worker struct {
-	id          int
-	shutdown    atomic.Bool
-	interrupted atomic.Bool
-	interrupter chan struct{}
-	executor    interface {
-		isTraceWorkers() bool
-		getWorksItems() chan workItem
-		getWorkerWaitGroup() *sync.WaitGroup
-	}
-	hasEnded     atomic.Bool
-	lastReceived time.Time
-
-	log zerolog.Logger
-}
-
-func (w *worker) initialize() {
-	w.shutdown.Store(false)
-	w.interrupted.Store(false)
-	w.interrupter = make(chan struct{}, 1)
-	w.hasEnded.Store(false)
-	w.lastReceived = time.Now()
-}
-
-func (w *worker) work() {
-	w.executor.getWorkerWaitGroup().Add(1)
-	defer w.executor.getWorkerWaitGroup().Done()
-	defer func() {
-		if err := recover(); err != nil {
-			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-		}
-		if !w.shutdown.Load() {
-			// if we are not in shutdown we continue
-			w.work()
-		}
-	}()
-	workerLog := w.log.With().Int("Worker id", w.id).Logger()
-	if !w.executor.isTraceWorkers() {
-		workerLog = zerolog.Nop()
-	}
-	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
-	w.hasEnded.Store(false)
-	workerLog.Debug().Msgf("setting to not ended")
-
-	for !w.shutdown.Load() {
-		workerLog.Debug().Msg("Working")
-		select {
-		case _workItem := <-w.executor.getWorksItems():
-			w.lastReceived = time.Now()
-			workerLog.Debug().Msgf("Got work item %v", _workItem)
-			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
-				workerLog.Debug().Msg("We need to stop")
-				// TODO: do we need to complete with a error?
-			} else {
-				workerLog.Debug().Msgf("Running work item %v", _workItem)
-				_workItem.runnable()
-				_workItem.completionFuture.complete()
-				workerLog.Debug().Msgf("work item %v completed", _workItem)
-			}
-		case <-w.interrupter:
-			workerLog.Debug().Msg("We got interrupted")
-		}
-	}
-	w.hasEnded.Store(true)
-	workerLog.Debug().Msg("setting to ended")
-}
-
-type workItem struct {
-	workItemId       int32
-	runnable         Runnable
-	completionFuture *future
-}
-
-func (w workItem) String() string {
-	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
-}
-
 type Executor interface {
 	io.Closer
 	Start()
@@ -120,43 +36,6 @@ type Executor interface {
 	IsRunning() bool
 }
 
-type executor struct {
-	running      bool
-	shutdown     bool
-	stateChange  sync.Mutex
-	worker       []*worker
-	queueDepth   int
-	workItems    chan workItem
-	traceWorkers bool
-
-	workerWaitGroup sync.WaitGroup
-
-	log zerolog.Logger
-}
-
-func (e *executor) isTraceWorkers() bool {
-	return e.traceWorkers
-}
-
-func (e *executor) getWorksItems() chan workItem {
-	return e.workItems
-}
-
-func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
-	return &e.workerWaitGroup
-}
-
-type dynamicExecutor struct {
-	*executor
-
-	maxNumberOfWorkers     int
-	currentNumberOfWorkers atomic.Int32
-	dynamicStateChange     sync.Mutex
-	interrupter            chan struct{}
-
-	dynamicWorkers sync.WaitGroup
-}
-
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	workers := make([]*worker, numberOfWorkers)
 	customLogger := options.ExtractCustomLogger(_options...)
@@ -184,10 +63,6 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 	return _executor
 }
 
-var upScaleInterval = 100 * time.Millisecond
-var downScaleInterval = 5 * time.Second
-var timeToBecomeUnused = 5 * time.Second
-
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLogger(_options...)
 	_executor := &dynamicExecutor{
@@ -219,265 +94,19 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
 	return &tracerWorkersOption{traceWorkers: traceWorkers}
 }
 
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
 type tracerWorkersOption struct {
 	options.Option
 	traceWorkers bool
 }
 
-func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
-	if runnable == nil {
-		value := atomic.Value{}
-		value.Store(errors.New("runnable must not be nil"))
-		return &future{err: value}
-	}
-	e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
-	completionFuture := &future{}
-	if e.shutdown {
-		completionFuture.Cancel(false, errors.New("executor in shutdown"))
-		return completionFuture
-	}
-	select {
-	case e.workItems <- workItem{
-		workItemId:       workItemId,
-		runnable:         runnable,
-		completionFuture: completionFuture,
-	}:
-		e.log.Trace().Msg("Item added")
-	case <-ctx.Done():
-		completionFuture.Cancel(false, ctx.Err())
-	}
-
-	e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
-	return completionFuture
-}
-
-func (e *executor) Start() {
-	e.stateChange.Lock()
-	defer e.stateChange.Unlock()
-	if e.running || e.shutdown {
-		e.log.Warn().Msg("Already started")
-		return
-	}
-	e.running = true
-	e.shutdown = false
-	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.initialize()
-		go worker.work()
-	}
-}
-
-func (e *dynamicExecutor) Start() {
-	e.dynamicStateChange.Lock()
-	defer e.dynamicStateChange.Unlock()
-	if e.running || e.shutdown {
-		e.log.Warn().Msg("Already started")
-		return
-	}
-	if e.interrupter != nil {
-		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
-		close(e.interrupter)
-		e.dynamicWorkers.Wait()
-	}
-
-	e.executor.Start()
-	mutex := sync.Mutex{}
-	e.interrupter = make(chan struct{})
-	// Worker spawner
-	go func() {
-		e.dynamicWorkers.Add(1)
-		defer e.dynamicWorkers.Done()
-		defer func() {
-			if err := recover(); err != nil {
-				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-			}
-		}()
-		workerLog := e.log.With().Str("Worker type", "spawner").Logger()
-		if !e.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for e.running && !e.shutdown {
-			workerLog.Trace().Msg("running")
-			mutex.Lock()
-			numberOfItemsInQueue := len(e.workItems)
-			numberOfWorkers := len(e.worker)
-			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
-			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
-				workerLog.Trace().Msg("spawning new worker")
-				_worker := &worker{
-					id:           numberOfWorkers - 1,
-					interrupter:  make(chan struct{}, 1),
-					executor:     e,
-					lastReceived: time.Now(),
-					log:          e.log,
-				}
-				e.worker = append(e.worker, _worker)
-				_worker.initialize()
-				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
-				go _worker.work()
-				e.currentNumberOfWorkers.Add(1)
-			} else {
-				workerLog.Trace().Msg("Nothing to scale")
-			}
-			mutex.Unlock()
-			func() {
-				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
-				timer := time.NewTimer(upScaleInterval)
-				defer utils.CleanupTimer(timer)
-				select {
-				case <-timer.C:
-				case <-e.interrupter:
-					workerLog.Info().Msg("interrupted")
-				}
-			}()
-		}
-		workerLog.Info().Msg("Terminated")
-	}()
-	// Worker killer
-	go func() {
-		e.dynamicWorkers.Add(1)
-		defer e.dynamicWorkers.Done()
-		defer func() {
-			if err := recover(); err != nil {
-				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-			}
-		}()
-		workerLog := e.log.With().Str("Worker type", "killer").Logger()
-		if !e.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for e.running && !e.shutdown {
-			workerLog.Trace().Msg("running")
-			mutex.Lock()
-			newWorkers := make([]*worker, 0)
-			for _, _worker := range e.worker {
-				deadline := time.Now().Add(-timeToBecomeUnused)
-				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
-				if _worker.lastReceived.Before(deadline) {
-					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
-					_worker.interrupted.Store(true)
-					close(_worker.interrupter)
-					e.currentNumberOfWorkers.Add(-1)
-				} else {
-					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
-					newWorkers = append(newWorkers, _worker)
-				}
-			}
-			e.worker = newWorkers
-			mutex.Unlock()
-			func() {
-				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
-				timer := time.NewTimer(downScaleInterval)
-				defer utils.CleanupTimer(timer)
-				select {
-				case <-timer.C:
-				case <-e.interrupter:
-					workerLog.Info().Msg("interrupted")
-				}
-			}()
-		}
-		workerLog.Info().Msg("Terminated")
-	}()
-}
-
-func (e *executor) Stop() {
-	e.log.Trace().Msg("stopping now")
-	e.stateChange.Lock()
-	defer e.stateChange.Unlock()
-	if !e.running || e.shutdown {
-		e.log.Warn().Msg("already stopped")
-		return
-	}
-	e.shutdown = true
-	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.shutdown.Store(true)
-		worker.interrupted.Store(true)
-		close(worker.interrupter)
-	}
-	e.running = false
-	e.shutdown = false
-	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
-	e.workerWaitGroup.Wait()
-	e.log.Trace().Msg("stopped")
-}
-
-func (e *dynamicExecutor) Stop() {
-	e.log.Trace().Msg("stopping now")
-	e.dynamicStateChange.Lock()
-	defer e.dynamicStateChange.Unlock()
-	if !e.running || e.shutdown {
-		e.log.Warn().Msg("already stopped")
-		return
-	}
-	close(e.interrupter)
-	e.log.Trace().Msg("stopping inner executor")
-	e.executor.Stop()
-	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
-	e.dynamicWorkers.Wait()
-	e.log.Trace().Msg("stopped")
-}
-
-func (e *executor) Close() error {
-	e.Stop()
-	return nil
-}
-
-func (e *executor) IsRunning() bool {
-	return e.running && !e.shutdown
-}
-
-type CompletionFuture interface {
-	AwaitCompletion(ctx context.Context) error
-	Cancel(interrupt bool, err error)
-}
-
-type future struct {
-	cancelRequested    atomic.Bool
-	interruptRequested atomic.Bool
-	completed          atomic.Bool
-	errored            atomic.Bool
-	err                atomic.Value
-}
-
-func (f *future) Cancel(interrupt bool, err error) {
-	f.cancelRequested.Store(true)
-	f.interruptRequested.Store(interrupt)
-	if err != nil {
-		f.errored.Store(true)
-		f.err.Store(err)
-	}
-}
-
-func (f *future) complete() {
-	f.completed.Store(true)
-}
-
-// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
-var Canceled = errors.New("Canceled")
-
-func (f *future) AwaitCompletion(ctx context.Context) error {
-	for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
-		time.Sleep(time.Millisecond * 10)
-	}
-	if err := ctx.Err(); err != nil {
-		return err
-	}
-	if err, ok := f.err.Load().(error); ok {
-		return err
-	}
-	if f.cancelRequested.Load() {
-		return Canceled
-	}
-	return nil
-}
-
-func (f *future) String() string {
-	return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
-		f.cancelRequested.Load(),
-		f.interruptRequested.Load(),
-		f.completed.Load(),
-		f.errored.Load(),
-		f.err.Load(),
-	)
-}
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 9a62a09d25..b531d94e7e 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -21,9 +21,7 @@ package pool
 
 import (
 	"context"
-	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
-	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"math/rand"
@@ -32,212 +30,6 @@ import (
 	"time"
 )
 
-func TestExecutor_Start(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		shouldRun bool
-	}{
-		{
-			name:      "Start fresh",
-			shouldRun: true,
-		},
-		{
-			name: "Start running",
-			fields: fields{
-				running: true,
-			},
-			shouldRun: true,
-		},
-		{
-			name: "Start stopping",
-			fields: fields{
-				running:  true,
-				shutdown: true,
-			},
-			shouldRun: false,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Start()
-			assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
-		})
-	}
-}
-
-func TestExecutor_Stop(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		shouldRun bool
-	}{
-		{
-			name:      "Stop stopped",
-			shouldRun: false,
-		},
-		{
-			name: "Stop running",
-			fields: fields{
-				running: true,
-				queue:   make(chan workItem),
-				worker: []*worker{
-					func() *worker {
-						w := &worker{}
-						w.initialize()
-						return w
-					}(),
-				},
-			},
-			shouldRun: false,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Stop()
-		})
-	}
-}
-
-func TestExecutor_Submit(t *testing.T) {
-	type fields struct {
-		running      bool
-		shutdown     bool
-		worker       []*worker
-		queue        chan workItem
-		traceWorkers bool
-	}
-	type args struct {
-		workItemId int32
-		runnable   Runnable
-		context    context.Context
-	}
-	tests := []struct {
-		name                      string
-		fields                    fields
-		args                      args
-		completionFutureValidator func(t *testing.T, future CompletionFuture) bool
-		waitForCompletion         bool
-	}{
-		{
-			name: "submitting nothing",
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				return assert.Error(t, completionFuture.(*future).err.Load().(error))
-			},
-		},
-		{
-			name: "submit canceled",
-			fields: fields{
-				queue: make(chan workItem, 0),
-			},
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// We do something for 3 seconds
-					<-time.NewTimer(3 * time.Second).C
-				},
-				context: func() context.Context {
-					ctx, cancelFunc := context.WithCancel(context.Background())
-					cancelFunc()
-					return ctx
-				}(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				err := completionFuture.(*future).err.Load().(error)
-				return assert.Error(t, err)
-			},
-		},
-		{
-			name: "Submit something which doesn't complete",
-			fields: fields{
-				queue: make(chan workItem, 1),
-			},
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// We do something for 3 seconds
-					<-time.NewTimer(3 * time.Second).C
-				},
-				context: context.TODO(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				completed := completionFuture.(*future).completed.Load()
-				return assert.False(t, completed)
-			},
-		},
-		{
-			name: "Submit something which does complete",
-			fields: func() fields {
-				var executor = NewFixedSizeExecutor(1, 1).(*executor)
-				return fields{
-					running:      executor.running,
-					shutdown:     executor.shutdown,
-					worker:       executor.worker,
-					queue:        executor.workItems,
-					traceWorkers: true,
-				}
-			}(),
-			args: args{
-				workItemId: 13,
-				runnable: func() {
-					// NOOP
-				},
-				context: context.TODO(),
-			},
-			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
-				completed := completionFuture.(*future).completed.Load()
-				return assert.True(t, completed)
-			},
-			waitForCompletion: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			e := &executor{
-				running:      tt.fields.running,
-				shutdown:     tt.fields.shutdown,
-				worker:       tt.fields.worker,
-				workItems:    tt.fields.queue,
-				traceWorkers: tt.fields.traceWorkers,
-			}
-			e.Start()
-			completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
-			if tt.waitForCompletion {
-				assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
-			}
-			assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
-		})
-	}
-}
-
 func TestNewFixedSizeExecutor(t *testing.T) {
 	type args struct {
 		numberOfWorkers int
@@ -407,487 +199,6 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
 	}
 }
 
-func TestWorkItem_String(t *testing.T) {
-	type fields struct {
-		workItemId       int32
-		runnable         Runnable
-		completionFuture *future
-	}
-	tests := []struct {
-		name   string
-		fields fields
-		want   string
-	}{
-		{
-			name: "Simple test",
-			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			w := &workItem{
-				workItemId:       tt.fields.workItemId,
-				runnable:         tt.fields.runnable,
-				completionFuture: tt.fields.completionFuture,
-			}
-			assert.Equalf(t, tt.want, w.String(), "String()")
-		})
-	}
-}
-
-func TestWorker_work(t *testing.T) {
-	type fields struct {
-		id       int
-		executor *executor
-	}
-	tests := []struct {
-		name                       string
-		fields                     fields
-		timeBeforeFirstValidation  time.Duration
-		firstValidation            func(*testing.T, *worker)
-		timeBeforeManipulation     time.Duration
-		manipulator                func(*worker)
-		timeBeforeSecondValidation time.Duration
-		secondValidation           func(*testing.T, *worker)
-	}{
-		{
-			name: "Worker should work till shutdown (even if it panics)",
-			fields: fields{
-				id: 0,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								panic("Oh no what should I do???")
-							},
-							completionFuture: &future{},
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Worker should work till shutdown",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								time.Sleep(time.Millisecond * 70)
-							},
-							completionFuture: &future{},
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Work interrupted",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					return e
-				}(),
-			},
-			timeBeforeFirstValidation: 50 * time.Millisecond,
-			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
-			},
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-			timeBeforeSecondValidation: 150 * time.Millisecond,
-			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
-			},
-		},
-		{
-			name: "Work on canceled",
-			fields: fields{
-				id: 1,
-				executor: func() *executor {
-					e := &executor{
-						workItems:    make(chan workItem),
-						traceWorkers: true,
-					}
-					go func() {
-						completionFuture := &future{}
-						completionFuture.cancelRequested.Store(true)
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								time.Sleep(time.Millisecond * 70)
-							},
-							completionFuture: completionFuture,
-						}
-					}()
-					return e
-				}(),
-			},
-			timeBeforeManipulation: 50 * time.Millisecond,
-			manipulator: func(w *worker) {
-				w.shutdown.Store(true)
-				w.interrupter <- struct{}{}
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			w := &worker{
-				id:          tt.fields.id,
-				interrupter: make(chan struct{}, 1),
-				executor:    tt.fields.executor,
-			}
-			go w.work()
-			if tt.firstValidation != nil {
-				time.Sleep(tt.timeBeforeFirstValidation)
-				t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
-				tt.firstValidation(t, w)
-			}
-			if tt.manipulator != nil {
-				time.Sleep(tt.timeBeforeManipulation)
-				t.Logf("manipulator after %v", tt.timeBeforeManipulation)
-				tt.manipulator(w)
-			}
-			if tt.secondValidation != nil {
-				time.Sleep(tt.timeBeforeSecondValidation)
-				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
-				tt.secondValidation(t, w)
-			}
-		})
-	}
-}
-
-func Test_future_AwaitCompletion(t *testing.T) {
-	type args struct {
-		ctx context.Context
-	}
-	tests := []struct {
-		name      string
-		args      args
-		completer func(*future)
-		wantErr   assert.ErrorAssertionFunc
-	}{
-		{
-			name: "completes with error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				f.Cancel(false, errors.New("Uh oh"))
-			},
-			wantErr: assert.Error,
-		},
-		{
-			name: "completes regular",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 30)
-				f.complete()
-			},
-			wantErr: assert.NoError,
-		},
-		{
-			name: "completes not int time",
-			args: args{ctx: func() context.Context {
-				deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
-				t.Cleanup(cancel)
-				return deadline
-			}()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-			},
-			wantErr: assert.Error,
-		},
-		{
-			name: "completes canceled without error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-				f.Cancel(true, nil)
-			},
-			wantErr: func(t assert.TestingT, err error, i ...any) bool {
-				assert.Same(t, Canceled, err)
-				return true
-			},
-		},
-		{
-			name: "completes canceled with particular error",
-			args: args{ctx: context.TODO()},
-			completer: func(f *future) {
-				time.Sleep(time.Millisecond * 300)
-				f.Cancel(true, errors.New("Uh oh"))
-			},
-			wantErr: func(t assert.TestingT, err error, i ...any) bool {
-				assert.Equal(t, "Uh oh", err.Error())
-				return true
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			go tt.completer(f)
-			tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
-		})
-	}
-}
-
-func Test_future_Cancel(t *testing.T) {
-	type args struct {
-		interrupt bool
-		err       error
-	}
-	tests := []struct {
-		name     string
-		args     args
-		verifier func(*testing.T, *future)
-	}{
-		{
-			name: "cancel cancels",
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-			},
-		},
-		{
-			name: "cancel with interrupt",
-			args: args{
-				interrupt: true,
-				err:       nil,
-			},
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-				assert.False(t, f.errored.Load())
-				assert.Nil(t, f.err.Load())
-			},
-		},
-		{
-			name: "cancel with err",
-			args: args{
-				interrupt: true,
-				err:       errors.New("Uh Oh"),
-			},
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.cancelRequested.Load())
-				assert.True(t, f.errored.Load())
-				assert.NotNil(t, f.err.Load())
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			f.Cancel(tt.args.interrupt, tt.args.err)
-			tt.verifier(t, f)
-		})
-	}
-}
-
-func Test_future_complete(t *testing.T) {
-	tests := []struct {
-		name     string
-		verifier func(*testing.T, *future)
-	}{
-		{
-			name: "complete completes",
-			verifier: func(t *testing.T, f *future) {
-				assert.True(t, f.completed.Load())
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			f := &future{}
-			f.complete()
-			tt.verifier(t, f)
-		})
-	}
-}
-
-func Test_dynamicExecutor_Start(t *testing.T) {
-	type fields struct {
-		executor           *executor
-		maxNumberOfWorkers int
-	}
-	tests := []struct {
-		name       string
-		fields     fields
-		setup      func(t *testing.T, fields *fields)
-		startTwice bool
-	}{
-		{
-			name: "just start",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "start twice",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-			startTwice: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if tt.setup != nil {
-				tt.setup(t, &tt.fields)
-			}
-			e := &dynamicExecutor{
-				executor:           tt.fields.executor,
-				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
-			}
-			e.Start()
-			if tt.startTwice {
-				e.Start()
-			}
-			// Let it work a bit
-			time.Sleep(20 * time.Millisecond)
-			t.Log("done with test")
-			t.Cleanup(e.Stop)
-		})
-	}
-}
-
-func Test_dynamicExecutor_Stop(t *testing.T) {
-	type fields struct {
-		executor           *executor
-		maxNumberOfWorkers int
-		interrupter        chan struct{}
-	}
-	tests := []struct {
-		name      string
-		fields    fields
-		setup     func(t *testing.T, fields *fields)
-		startIt   bool
-		stopTwice bool
-	}{
-		{
-			name: "just stop",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "stop started",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-		},
-		{
-			name: "stop twice",
-			fields: fields{
-				executor: &executor{
-					workItems:    make(chan workItem, 1),
-					worker:       make([]*worker, 0),
-					traceWorkers: true,
-				},
-				maxNumberOfWorkers: 100,
-			},
-			setup: func(t *testing.T, fields *fields) {
-				fields.executor.log = produceTestLogger(t)
-				fields.executor.workItems <- workItem{1, func() {}, &future{}}
-			},
-			stopTwice: true,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			if tt.setup != nil {
-				tt.setup(t, &tt.fields)
-			}
-			e := &dynamicExecutor{
-				executor:           tt.fields.executor,
-				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
-				interrupter:        tt.fields.interrupter,
-			}
-			if tt.startIt {
-				e.Start()
-			}
-			e.Stop()
-			if tt.stopTwice {
-				e.Stop()
-			}
-		})
-	}
-}
-
 // from: https://github.com/golang/go/issues/36532#issuecomment-575535452
 func testContext(t *testing.T) context.Context {
 	ctx, cancel := context.WithCancel(context.Background())
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
new file mode 100644
index 0000000000..22f8d20e5b
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"github.com/rs/zerolog"
+	"runtime/debug"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
+type dynamicExecutor struct {
+	*executor
+
+	maxNumberOfWorkers     int
+	currentNumberOfWorkers atomic.Int32
+	dynamicStateChange     sync.Mutex
+	interrupter            chan struct{}
+
+	dynamicWorkers sync.WaitGroup
+}
+
+func (e *dynamicExecutor) Start() {
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	if e.interrupter != nil {
+		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+		close(e.interrupter)
+		e.dynamicWorkers.Wait()
+	}
+
+	e.executor.Start()
+	mutex := sync.Mutex{}
+	e.interrupter = make(chan struct{})
+	// Worker spawner
+	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Trace().Msg("running")
+			mutex.Lock()
+			numberOfItemsInQueue := len(e.workItems)
+			numberOfWorkers := len(e.worker)
+			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+				workerLog.Trace().Msg("spawning new worker")
+				_worker := &worker{
+					id:           numberOfWorkers - 1,
+					interrupter:  make(chan struct{}, 1),
+					executor:     e,
+					lastReceived: time.Now(),
+					log:          e.log,
+				}
+				e.worker = append(e.worker, _worker)
+				_worker.initialize()
+				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+				go _worker.work()
+				e.currentNumberOfWorkers.Add(1)
+			} else {
+				workerLog.Trace().Msg("Nothing to scale")
+			}
+			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+				timer := time.NewTimer(upScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
+		}
+		workerLog.Info().Msg("Terminated")
+	}()
+	// Worker killer
+	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "killer").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Trace().Msg("running")
+			mutex.Lock()
+			newWorkers := make([]*worker, 0)
+			for _, _worker := range e.worker {
+				deadline := time.Now().Add(-timeToBecomeUnused)
+				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+				if _worker.lastReceived.Before(deadline) {
+					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+					_worker.interrupted.Store(true)
+					close(_worker.interrupter)
+					e.currentNumberOfWorkers.Add(-1)
+				} else {
+					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+					newWorkers = append(newWorkers, _worker)
+				}
+			}
+			e.worker = newWorkers
+			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+				timer := time.NewTimer(downScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
+		}
+		workerLog.Info().Msg("Terminated")
+	}()
+}
+
+func (e *dynamicExecutor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	close(e.interrupter)
+	e.log.Trace().Msg("stopping inner executor")
+	e.executor.Stop()
+	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+	e.dynamicWorkers.Wait()
+	e.log.Trace().Msg("stopped")
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
new file mode 100644
index 0000000000..1db92a5d7f
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"testing"
+	"time"
+)
+
+func Test_dynamicExecutor_Start(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+	}
+	tests := []struct {
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		startTwice bool
+	}{
+		{
+			name: "just start",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "start twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			startTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+			}
+			e.Start()
+			if tt.startTwice {
+				e.Start()
+			}
+			// Let it work a bit
+			time.Sleep(20 * time.Millisecond)
+			t.Log("done with test")
+			t.Cleanup(e.Stop)
+		})
+	}
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+		interrupter        chan struct{}
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		setup     func(t *testing.T, fields *fields)
+		startIt   bool
+		stopTwice bool
+	}{
+		{
+			name: "just stop",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			stopTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+				interrupter:        tt.fields.interrupter,
+			}
+			if tt.startIt {
+				e.Start()
+			}
+			e.Stop()
+			if tt.stopTwice {
+				e.Stop()
+			}
+		})
+	}
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
new file mode 100644
index 0000000000..79bddb6dc0
--- /dev/null
+++ b/plc4go/spi/pool/executor.go
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"sync"
+	"sync/atomic"
+
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
+)
+
+type executor struct {
+	running      bool
+	shutdown     bool
+	stateChange  sync.Mutex
+	worker       []*worker
+	queueDepth   int
+	workItems    chan workItem
+	traceWorkers bool
+
+	workerWaitGroup sync.WaitGroup
+
+	log zerolog.Logger
+}
+
+func (e *executor) isTraceWorkers() bool {
+	return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+	return e.workItems
+}
+
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+	return &e.workerWaitGroup
+}
+
+func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
+	if runnable == nil {
+		value := atomic.Value{}
+		value.Store(errors.New("runnable must not be nil"))
+		return &future{err: value}
+	}
+	e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+	completionFuture := &future{}
+	if e.shutdown {
+		completionFuture.Cancel(false, errors.New("executor in shutdown"))
+		return completionFuture
+	}
+	select {
+	case e.workItems <- workItem{
+		workItemId:       workItemId,
+		runnable:         runnable,
+		completionFuture: completionFuture,
+	}:
+		e.log.Trace().Msg("Item added")
+	case <-ctx.Done():
+		completionFuture.Cancel(false, ctx.Err())
+	}
+
+	e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+	return completionFuture
+}
+
+func (e *executor) Start() {
+	e.stateChange.Lock()
+	defer e.stateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	e.running = true
+	e.shutdown = false
+	for i := 0; i < len(e.worker); i++ {
+		worker := e.worker[i]
+		worker.initialize()
+		go worker.work()
+	}
+}
+
+func (e *executor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.stateChange.Lock()
+	defer e.stateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	e.shutdown = true
+	for i := 0; i < len(e.worker); i++ {
+		worker := e.worker[i]
+		worker.shutdown.Store(true)
+		worker.interrupted.Store(true)
+		close(worker.interrupter)
+	}
+	e.running = false
+	e.shutdown = false
+	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+	e.workerWaitGroup.Wait()
+	e.log.Trace().Msg("stopped")
+}
+
+func (e *executor) Close() error {
+	e.Stop()
+	return nil
+}
+
+func (e *executor) IsRunning() bool {
+	return e.running && !e.shutdown
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
new file mode 100644
index 0000000000..2a43b1366e
--- /dev/null
+++ b/plc4go/spi/pool/executor_test.go
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"context"
+	"fmt"
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
+	"sync"
+	"testing"
+	"time"
+)
+
+func Test_executor_Close(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name:    "close it",
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			tt.wantErr(t, e.Close(), fmt.Sprintf("Close()"))
+		})
+	}
+}
+
+func Test_executor_IsRunning(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   bool
+	}{
+		{
+			name: "no",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
+		})
+	}
+}
+
+func Test_executor_Start(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		shouldRun bool
+	}{
+		{
+			name:      "Start fresh",
+			shouldRun: true,
+		},
+		{
+			name: "Start running",
+			fields: fields{
+				running: true,
+			},
+			shouldRun: true,
+		},
+		{
+			name: "Start stopping",
+			fields: fields{
+				running:  true,
+				shutdown: true,
+			},
+			shouldRun: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Start()
+			assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
+		})
+	}
+}
+
+func Test_executor_Stop(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		shouldRun bool
+	}{
+		{
+			name:      "Stop stopped",
+			shouldRun: false,
+		},
+		{
+			name: "Stop running",
+			fields: fields{
+				running: true,
+				queue:   make(chan workItem),
+				worker: []*worker{
+					func() *worker {
+						w := &worker{}
+						w.initialize()
+						return w
+					}(),
+				},
+			},
+			shouldRun: false,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Stop()
+		})
+	}
+}
+
+func Test_executor_Submit(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queue        chan workItem
+		traceWorkers bool
+	}
+	type args struct {
+		workItemId int32
+		runnable   Runnable
+		context    context.Context
+	}
+	tests := []struct {
+		name                      string
+		fields                    fields
+		args                      args
+		completionFutureValidator func(t *testing.T, future CompletionFuture) bool
+		waitForCompletion         bool
+	}{
+		{
+			name: "submitting nothing",
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				return assert.Error(t, completionFuture.(*future).err.Load().(error))
+			},
+		},
+		{
+			name: "submit canceled",
+			fields: fields{
+				queue: make(chan workItem, 0),
+			},
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// We do something for 3 seconds
+					<-time.NewTimer(3 * time.Second).C
+				},
+				context: func() context.Context {
+					ctx, cancelFunc := context.WithCancel(context.Background())
+					cancelFunc()
+					return ctx
+				}(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				err := completionFuture.(*future).err.Load().(error)
+				return assert.Error(t, err)
+			},
+		},
+		{
+			name: "Submit something which doesn't complete",
+			fields: fields{
+				queue: make(chan workItem, 1),
+			},
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// We do something for 3 seconds
+					<-time.NewTimer(3 * time.Second).C
+				},
+				context: context.TODO(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				completed := completionFuture.(*future).completed.Load()
+				return assert.False(t, completed)
+			},
+		},
+		{
+			name: "Submit something which does complete",
+			fields: func() fields {
+				var executor = NewFixedSizeExecutor(1, 1).(*executor)
+				return fields{
+					running:      executor.running,
+					shutdown:     executor.shutdown,
+					worker:       executor.worker,
+					queue:        executor.workItems,
+					traceWorkers: true,
+				}
+			}(),
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// NOOP
+				},
+				context: context.TODO(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				completed := completionFuture.(*future).completed.Load()
+				return assert.True(t, completed)
+			},
+			waitForCompletion: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				workItems:    tt.fields.queue,
+				traceWorkers: tt.fields.traceWorkers,
+			}
+			e.Start()
+			completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
+			if tt.waitForCompletion {
+				assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
+			}
+			assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+		})
+	}
+}
+
+func Test_executor_getWorkerWaitGroup(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   *sync.WaitGroup
+	}{
+		{
+			name: "get it",
+			want: &sync.WaitGroup{},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.getWorkerWaitGroup(), "getWorkerWaitGroup()")
+		})
+	}
+}
+
+func Test_executor_getWorksItems(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   chan workItem
+	}{
+		{
+			name: "get it",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.getWorksItems(), "getWorksItems()")
+		})
+	}
+}
+
+func Test_executor_isTraceWorkers(t *testing.T) {
+	type fields struct {
+		running      bool
+		shutdown     bool
+		worker       []*worker
+		queueDepth   int
+		workItems    chan workItem
+		traceWorkers bool
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   bool
+	}{
+		{
+			name: "it is not",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			e := &executor{
+				running:      tt.fields.running,
+				shutdown:     tt.fields.shutdown,
+				worker:       tt.fields.worker,
+				queueDepth:   tt.fields.queueDepth,
+				workItems:    tt.fields.workItems,
+				traceWorkers: tt.fields.traceWorkers,
+				log:          tt.fields.log,
+			}
+			assert.Equalf(t, tt.want, e.isTraceWorkers(), "isTraceWorkers()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
new file mode 100644
index 0000000000..942480bc1a
--- /dev/null
+++ b/plc4go/spi/pool/workItem.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import "fmt"
+
+type workItem struct {
+	workItemId       int32
+	runnable         Runnable
+	completionFuture *future
+}
+
+func (w workItem) String() string {
+	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+}
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
new file mode 100644
index 0000000000..ebc4b30c5c
--- /dev/null
+++ b/plc4go/spi/pool/workItem_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func Test_workItem_String(t *testing.T) {
+	type fields struct {
+		workItemId       int32
+		runnable         Runnable
+		completionFuture *future
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   string
+	}{
+		{
+			name: "Simple test",
+			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &workItem{
+				workItemId:       tt.fields.workItemId,
+				runnable:         tt.fields.runnable,
+				completionFuture: tt.fields.completionFuture,
+			}
+			assert.Equalf(t, tt.want, w.String(), "String()")
+		})
+	}
+}
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
new file mode 100644
index 0000000000..7530ed0ad9
--- /dev/null
+++ b/plc4go/spi/pool/worker.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"runtime/debug"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/rs/zerolog"
+)
+
+type worker struct {
+	id          int
+	shutdown    atomic.Bool
+	interrupted atomic.Bool
+	interrupter chan struct{}
+	executor    interface {
+		isTraceWorkers() bool
+		getWorksItems() chan workItem
+		getWorkerWaitGroup() *sync.WaitGroup
+	}
+	hasEnded     atomic.Bool
+	lastReceived time.Time
+
+	log zerolog.Logger
+}
+
+func (w *worker) initialize() {
+	w.shutdown.Store(false)
+	w.interrupted.Store(false)
+	w.interrupter = make(chan struct{}, 1)
+	w.hasEnded.Store(false)
+	w.lastReceived = time.Now()
+}
+
+func (w *worker) work() {
+	w.executor.getWorkerWaitGroup().Add(1)
+	defer w.executor.getWorkerWaitGroup().Done()
+	defer func() {
+		if err := recover(); err != nil {
+			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+		}
+		if !w.shutdown.Load() {
+			// if we are not in shutdown we continue
+			w.work()
+		}
+	}()
+	workerLog := w.log.With().Int("Worker id", w.id).Logger()
+	if !w.executor.isTraceWorkers() {
+		workerLog = zerolog.Nop()
+	}
+	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+	w.hasEnded.Store(false)
+	workerLog.Debug().Msgf("setting to not ended")
+
+	for !w.shutdown.Load() {
+		workerLog.Debug().Msg("Working")
+		select {
+		case _workItem := <-w.executor.getWorksItems():
+			w.lastReceived = time.Now()
+			workerLog.Debug().Msgf("Got work item %v", _workItem)
+			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
+				workerLog.Debug().Msg("We need to stop")
+				// TODO: do we need to complete with a error?
+			} else {
+				workerLog.Debug().Msgf("Running work item %v", _workItem)
+				_workItem.runnable()
+				_workItem.completionFuture.complete()
+				workerLog.Debug().Msgf("work item %v completed", _workItem)
+			}
+		case <-w.interrupter:
+			workerLog.Debug().Msg("We got interrupted")
+		}
+	}
+	w.hasEnded.Store(true)
+	workerLog.Debug().Msg("setting to ended")
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
new file mode 100644
index 0000000000..4f604d2efb
--- /dev/null
+++ b/plc4go/spi/pool/worker_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
+)
+
+func Test_worker_initialize(t *testing.T) {
+	type fields struct {
+		id          int
+		interrupter chan struct{}
+		executor    interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived time.Time
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name   string
+		fields fields
+	}{
+		{
+			name: "do it",
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				interrupter:  tt.fields.interrupter,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				log:          tt.fields.log,
+			}
+			w.initialize()
+		})
+	}
+}
+
+func Test_worker_work(t *testing.T) {
+	type fields struct {
+		id       int
+		executor *executor
+	}
+	tests := []struct {
+		name                       string
+		fields                     fields
+		timeBeforeFirstValidation  time.Duration
+		firstValidation            func(*testing.T, *worker)
+		timeBeforeManipulation     time.Duration
+		manipulator                func(*worker)
+		timeBeforeSecondValidation time.Duration
+		secondValidation           func(*testing.T, *worker)
+	}{
+		{
+			name: "Worker should work till shutdown (even if it panics)",
+			fields: fields{
+				id: 0,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								panic("Oh no what should I do???")
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Worker should work till shutdown",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								time.Sleep(time.Millisecond * 70)
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work interrupted",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work on canceled",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						completionFuture := &future{}
+						completionFuture.cancelRequested.Store(true)
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								time.Sleep(time.Millisecond * 70)
+							},
+							completionFuture: completionFuture,
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeManipulation: 50 * time.Millisecond,
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:          tt.fields.id,
+				interrupter: make(chan struct{}, 1),
+				executor:    tt.fields.executor,
+			}
+			go w.work()
+			if tt.firstValidation != nil {
+				time.Sleep(tt.timeBeforeFirstValidation)
+				t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+				tt.firstValidation(t, w)
+			}
+			if tt.manipulator != nil {
+				time.Sleep(tt.timeBeforeManipulation)
+				t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+				tt.manipulator(w)
+			}
+			if tt.secondValidation != nil {
+				time.Sleep(tt.timeBeforeSecondValidation)
+				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+				tt.secondValidation(t, w)
+			}
+		})
+	}
+}