You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 13:50:56 UTC

[plc4x] 01/05: fix(plc4go/spi): fix race issues in worker pool

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5e51e6606dc9fe58d8324a5e2e102e0c4537aeb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 14:18:31 2023 +0200

    fix(plc4go/spi): fix race issues in worker pool
---
 plc4go/internal/cbus/Browser_test.go               |   3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go     |   2 +-
 plc4go/spi/pool/dynamicExecutor.go                 |   5 +-
 plc4go/spi/pool/dynamicExecutor_test.go            |  36 +++---
 plc4go/spi/pool/executor.go                        |  11 +-
 plc4go/spi/pool/executor_test.go                   |  27 ++--
 plc4go/spi/pool/worker.go                          |  64 +++++++---
 plc4go/spi/pool/worker_plc4xgen.go                 |  34 ++---
 plc4go/spi/pool/worker_test.go                     | 137 +++++++++++++++++++--
 .../transactions/RequestTransactionManager_test.go |  46 +++----
 10 files changed, 251 insertions(+), 114 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 501eb7371a..3fef9b7ee8 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -162,8 +162,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 				})
 				connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
 				if err := connectionConnectResult.GetErr(); err != nil {
-					t.Error(err)
-					t.FailNow()
+					t.Fatal(err)
 				}
 				fields.connection = connectionConnectResult.GetConnection()
 				t.Cleanup(func() {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 1bacc4775b..588ba63d81 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1479,7 +1479,7 @@ func TestMapEncodedReply(t *testing.T) {
 					assert.NoError(t, transactionManager.Close())
 				})
 				transaction := transactionManager.StartTransaction()
-				t.Logf("Submitting No-Op to transaction %v", transaction)
+				t.Logf("Submitting No-Op to transaction\n%v", transaction)
 				transaction.Submit(func(transaction transactions.RequestTransaction) {
 					// NO-OP
 				})
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 7a1b7d5381..e2cf3afb56 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -91,7 +91,7 @@ func (e *dynamicExecutor) Start() {
 				e.worker = append(e.worker, _worker)
 				_worker.initialize()
 				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
-				go _worker.work()
+				_worker.start()
 				e.currentNumberOfWorkers.Add(1)
 			} else {
 				workerLog.Trace().Msg("Nothing to scale")
@@ -133,8 +133,7 @@ func (e *dynamicExecutor) Start() {
 				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
 				if _worker.lastReceived.Load().(time.Time).Before(deadline) {
 					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
-					_worker.interrupted.Store(true)
-					close(_worker.interrupter)
+					_worker.stop(true)
 					e.currentNumberOfWorkers.Add(-1)
 				} else {
 					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 107486eb52..e58ffd0f13 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -198,24 +198,24 @@ func Test_dynamicExecutor_String(t *testing.T) {
 				maxNumberOfWorkers: 3,
 			},
 			want: `
-╔═dynamicExecutor═══════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║
-║║║b0 false║║b0 false ║                                                                                    ║║
-║║╚════════╝╚═════════╝                                                                                    ║║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║║
-║║║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║║
-║║║0x0000000000000000 0║║0 element(s)║║  b0 false   ║                                                      ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗                                              ║
-║║0x0000000000000003 3║║     0x00000000 0      ║║0 element(s)║                                              ║
-║╚════════════════════╝╚═══════════════════════╝╚════════════╝                                              ║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═dynamicExecutor══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═running╗╔═shutdown╗                                                                                   ║║
+║║║b0 false║║b0 false ║                                                                                   ║║
+║║╚════════╝╚═════════╝                                                                                   ║║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║║
+║║║0x0000000000000000 0║║0 element(s)║║  b0 false   ║                                                     ║║
+║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗                                             ║
+║║0x0000000000000003 3║║     0x00000000 0      ║║0 element(s)║                                             ║
+║╚════════════════════╝╚═══════════════════════╝╚════════════╝                                             ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 654edf2973..794bfb4221 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -93,9 +93,9 @@ func (e *executor) Start() {
 	e.running = true
 	e.shutdown = false
 	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.initialize()
-		go worker.work()
+		_worker := e.worker[i]
+		_worker.initialize()
+		_worker.start()
 	}
 }
 
@@ -109,10 +109,7 @@ func (e *executor) Stop() {
 	}
 	e.shutdown = true
 	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.shutdown.Store(true)
-		worker.interrupted.Store(true)
-		close(worker.interrupter)
+		e.worker[i].stop(true)
 	}
 	e.running = false
 	e.shutdown = false
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index ee126bb989..15ecac74ca 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -434,7 +434,6 @@ func Test_executor_String(t *testing.T) {
 						id:          1,
 						shutdown:    atomic.Bool{},
 						interrupted: atomic.Bool{},
-						hasEnded:    atomic.Bool{},
 						lastReceived: func() atomic.Value {
 							value := atomic.Value{}
 							value.Store(time.Time{})
@@ -446,19 +445,19 @@ func Test_executor_String(t *testing.T) {
 				traceWorkers: true,
 			},
 			want: `
-╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗                                                                                    ║
-║║b1 true ║║ b1 true ║                                                                                    ║
-║╚════════╝╚═════════╝                                                                                    ║
-║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║
-║║║0x0000000000000001 1║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║
-║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║
-║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║
-║║0x0000000000000002 2║║0 element(s)║║   b1 true   ║                                                      ║
-║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║
-╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗                                                                                   ║
+║║b1 true ║║ b1 true ║                                                                                   ║
+║╚════════╝╚═════════╝                                                                                   ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║
+║║0x0000000000000002 2║║0 element(s)║║   b1 true   ║                                                     ║
+║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║
+╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 0fa5ae597b..7b1308de73 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+	"github.com/rs/zerolog/log"
 	"runtime/debug"
 	"sync"
 	"sync/atomic"
@@ -30,18 +31,21 @@ import (
 
 //go:generate go run ../../tools/plc4xgenerator/gen.go -type=worker
 type worker struct {
-	id          int
-	shutdown    atomic.Bool
-	interrupted atomic.Bool
-	interrupter chan struct{}
-	executor    interface {
+	id       int
+	executor interface {
 		isTraceWorkers() bool
 		getWorksItems() chan workItem
 		getWorkerWaitGroup() *sync.WaitGroup
 	}
-	hasEnded     atomic.Bool
+
 	lastReceived atomic.Value
 
+	stateChange sync.Mutex
+	running     atomic.Bool
+	shutdown    atomic.Bool
+	interrupted atomic.Bool
+	interrupter chan struct{}
+
 	log zerolog.Logger `ignore:"true"`
 }
 
@@ -49,32 +53,57 @@ func (w *worker) initialize() {
 	w.shutdown.Store(false)
 	w.interrupted.Store(false)
 	w.interrupter = make(chan struct{}, 1)
-	w.hasEnded.Store(false)
 	w.lastReceived.Store(time.Now())
 }
 
-func (w *worker) work() {
+func (w *worker) start() {
+	w.stateChange.Lock()
+	defer w.stateChange.Unlock()
+	if w.running.Load() {
+		log.Warn().Msg("Worker already started")
+		return
+	}
+	if w.executor.isTraceWorkers() {
+		w.log.Debug().Msgf("Starting worker\n%s", w)
+	}
 	w.executor.getWorkerWaitGroup().Add(1)
+	go w.work()
+}
+
+func (w *worker) stop(interrupt bool) {
+	w.stateChange.Lock()
+	defer w.stateChange.Unlock()
+	if !w.running.Load() {
+		log.Warn().Msg("Worker not running")
+		return
+	}
+	w.shutdown.Store(true)
+	if interrupt {
+		w.interrupted.Store(true)
+		close(w.interrupter)
+	}
+}
+
+func (w *worker) work() {
 	defer w.executor.getWorkerWaitGroup().Done()
 	defer func() {
 		if err := recover(); err != nil {
 			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-		}
-		if !w.shutdown.Load() {
-			// if we are not in shutdown we continue
-			w.work()
+			if !w.shutdown.Load() {
+				// if we are not in shutdown we continue
+				w.start()
+			}
 		}
 	}()
+	w.running.Store(true)
+	defer w.running.Store(false)
 	workerLog := w.log.With().Int("Worker id", w.id).Logger()
 	if !w.executor.isTraceWorkers() {
 		workerLog = zerolog.Nop()
 	}
-	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
-	w.hasEnded.Store(false)
-	workerLog.Debug().Msgf("setting to not ended")
 
 	for !w.shutdown.Load() {
-		workerLog.Debug().Msg("Working")
+		workerLog.Trace().Msg("Working")
 		select {
 		case _workItem := <-w.executor.getWorksItems():
 			w.lastReceived.Store(time.Now())
@@ -92,6 +121,5 @@ func (w *worker) work() {
 			workerLog.Debug().Msg("We got interrupted")
 		}
 	}
-	w.hasEnded.Store(true)
-	workerLog.Debug().Msg("setting to ended")
+	workerLog.Trace().Msg("done")
 }
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 23e8d1689c..9f132d3036 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -47,23 +47,6 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
 		return err
 	}
 
-	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
-		return err
-	}
-
-	if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
-		return err
-	}
-
-	_interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
-	if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
-		return err
-	}
-
-	if err := writeBuffer.WriteBit("hasEnded", d.hasEnded.Load()); err != nil {
-		return err
-	}
-
 	if d.lastReceived.Load() != nil {
 		if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
 			if err := writeBuffer.PushContext("lastReceived"); err != nil {
@@ -82,6 +65,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
 			}
 		}
 	}
+
+	if err := writeBuffer.WriteBit("running", d.running.Load()); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
+		return err
+	}
+
+	_interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
+	if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
+		return err
+	}
 	if err := writeBuffer.PopContext("worker"); err != nil {
 		return err
 	}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 120b9548dc..d781466b43 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,7 +20,9 @@
 package pool
 
 import (
+	"github.com/rs/zerolog"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -58,6 +60,118 @@ func Test_worker_initialize(t *testing.T) {
 	}
 }
 
+func Test_worker_start(t *testing.T) {
+	type fields struct {
+		id       int
+		executor interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived atomic.Value
+		interrupter  chan struct{}
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, worker *worker)
+	}{
+		{
+			name: "start it",
+			fields: fields{
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								// No-op
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+		},
+		{
+			name: "start started",
+			manipulator: func(t *testing.T, worker *worker) {
+				worker.running.Store(true)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				interrupter:  tt.fields.interrupter,
+				log:          tt.fields.log,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, w)
+			}
+			w.start()
+			t.Cleanup(func() {
+				w.stop(false)
+			})
+		})
+	}
+}
+
+func Test_worker_stop(t *testing.T) {
+	type fields struct {
+		id       int
+		executor interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived atomic.Value
+		interrupter  chan struct{}
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, worker *worker)
+	}{
+		{
+			name: "stop it",
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				interrupter: make(chan struct{}),
+			},
+			manipulator: func(t *testing.T, worker *worker) {
+				worker.running.Store(true)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				interrupter:  tt.fields.interrupter,
+				log:          tt.fields.log,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, w)
+			}
+			w.stop(true)
+		})
+	}
+}
+
 func Test_worker_work(t *testing.T) {
 	type fields struct {
 		id       int
@@ -96,7 +210,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
@@ -104,7 +218,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -130,14 +244,14 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -154,7 +268,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
@@ -162,7 +276,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -203,6 +317,7 @@ func Test_worker_work(t *testing.T) {
 				executor:    tt.fields.executor,
 				log:         produceTestingLogger(t),
 			}
+			w.executor.getWorkerWaitGroup().Add(1)
 			go w.work()
 			if tt.firstValidation != nil {
 				time.Sleep(tt.timeBeforeFirstValidation)
@@ -238,11 +353,11 @@ func Test_worker_String(t *testing.T) {
 		{
 			name: "string it",
 			want: `
-╔═worker════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║
-║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║
-║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═worker═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║
+║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║
+║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index d858749a97..63041083ea 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -598,29 +598,29 @@ func Test_requestTransactionManager_String(t *testing.T) {
 				traceTransactionManagerTransactions: true,
 			},
 			want: `
-╔═requestTransactionManager════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗                       ║
-║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    0x00000004 4     ║                       ║
-║║      ║ 0x00000002 2 ║║ b0 false ║       ║╚═══════════════════════════╝╚═════════════════════╝                       ║
-║║      ╚══════════════╝╚══════════╝       ║                                                                           ║
-║╚═════════════════════════════════════════╝                                                                           ║
-║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║b0 false ║║
-║║║b0 false║║b0 false ║                                                                                    ║╚═════════╝║
-║║╚════════╝╚═════════╝                                                                                    ║           ║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║           ║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║           ║
-║║║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║           ║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║           ║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║           ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║           ║
-║║║0x0000000000000001 1║║0 element(s)║║  b0 false   ║                                                      ║           ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║           ║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝           ║
-║╔═traceTransactionManagerTransactions╗                                                                                ║
-║║              b1 true               ║                                                                                ║
-║╚════════════════════════════════════╝                                                                                ║
-╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗                      ║
+║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    0x00000004 4     ║                      ║
+║║      ║ 0x00000002 2 ║║ b0 false ║       ║╚═══════════════════════════╝╚═════════════════════╝                      ║
+║║      ╚══════════════╝╚══════════╝       ║                                                                          ║
+║╚═════════════════════════════════════════╝                                                                          ║
+║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
+║║╔═running╗╔═shutdown╗                                                                                   ║║b0 false ║║
+║║║b0 false║║b0 false ║                                                                                   ║╚═════════╝║
+║║╚════════╝╚═════════╝                                                                                   ║           ║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║           ║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║           ║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║           ║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║           ║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║           ║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║           ║
+║║║0x0000000000000001 1║║0 element(s)║║  b0 false   ║                                                     ║           ║
+║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║           ║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝           ║
+║╔═traceTransactionManagerTransactions╗                                                                               ║
+║║              b1 true               ║                                                                               ║
+║╚════════════════════════════════════╝                                                                               ║
+╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {