You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 13:50:56 UTC
[plc4x] 01/05: fix(plc4go/spi): fix race issues in worker pool
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 5e51e6606dc9fe58d8324a5e2e102e0c4537aeb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 14:18:31 2023 +0200
fix(plc4go/spi): fix race issues in worker pool
---
plc4go/internal/cbus/Browser_test.go | 3 +-
plc4go/internal/cbus/CBusMessageMapper_test.go | 2 +-
plc4go/spi/pool/dynamicExecutor.go | 5 +-
plc4go/spi/pool/dynamicExecutor_test.go | 36 +++---
plc4go/spi/pool/executor.go | 11 +-
plc4go/spi/pool/executor_test.go | 27 ++--
plc4go/spi/pool/worker.go | 64 +++++++---
plc4go/spi/pool/worker_plc4xgen.go | 34 ++---
plc4go/spi/pool/worker_test.go | 137 +++++++++++++++++++--
.../transactions/RequestTransactionManager_test.go | 46 +++----
10 files changed, 251 insertions(+), 114 deletions(-)
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 501eb7371a..3fef9b7ee8 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -162,8 +162,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
})
connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
if err := connectionConnectResult.GetErr(); err != nil {
- t.Error(err)
- t.FailNow()
+ t.Fatal(err)
}
fields.connection = connectionConnectResult.GetConnection()
t.Cleanup(func() {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 1bacc4775b..588ba63d81 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1479,7 +1479,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t, transactionManager.Close())
})
transaction := transactionManager.StartTransaction()
- t.Logf("Submitting No-Op to transaction %v", transaction)
+ t.Logf("Submitting No-Op to transaction\n%v", transaction)
transaction.Submit(func(transaction transactions.RequestTransaction) {
// NO-OP
})
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 7a1b7d5381..e2cf3afb56 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -91,7 +91,7 @@ func (e *dynamicExecutor) Start() {
e.worker = append(e.worker, _worker)
_worker.initialize()
workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
- go _worker.work()
+ _worker.start()
e.currentNumberOfWorkers.Add(1)
} else {
workerLog.Trace().Msg("Nothing to scale")
@@ -133,8 +133,7 @@ func (e *dynamicExecutor) Start() {
workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
if _worker.lastReceived.Load().(time.Time).Before(deadline) {
workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
- _worker.interrupted.Store(true)
- close(_worker.interrupter)
+ _worker.stop(true)
e.currentNumberOfWorkers.Add(-1)
} else {
workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 107486eb52..e58ffd0f13 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -198,24 +198,24 @@ func Test_dynamicExecutor_String(t *testing.T) {
maxNumberOfWorkers: 3,
},
want: `
-╔═dynamicExecutor═══════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═running╗╔═shutdown╗ ║║
-║║║b0 false║║b0 false ║ ║║
-║║╚════════╝╚═════════╝ ║║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║║
-║║║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
-║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
-║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
-║╚════════════════════╝╚═══════════════════════╝╚════════════╝ ║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═dynamicExecutor══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═running╗╔═shutdown╗ ║║
+║║║b0 false║║b0 false ║ ║║
+║║╚════════╝╚═════════╝ ║║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
+║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
+║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
+║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
+║╚════════════════════╝╚═══════════════════════╝╚════════════╝ ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 654edf2973..794bfb4221 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -93,9 +93,9 @@ func (e *executor) Start() {
e.running = true
e.shutdown = false
for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.initialize()
- go worker.work()
+ _worker := e.worker[i]
+ _worker.initialize()
+ _worker.start()
}
}
@@ -109,10 +109,7 @@ func (e *executor) Stop() {
}
e.shutdown = true
for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.shutdown.Store(true)
- worker.interrupted.Store(true)
- close(worker.interrupter)
+ e.worker[i].stop(true)
}
e.running = false
e.shutdown = false
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index ee126bb989..15ecac74ca 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -434,7 +434,6 @@ func Test_executor_String(t *testing.T) {
id: 1,
shutdown: atomic.Bool{},
interrupted: atomic.Bool{},
- hasEnded: atomic.Bool{},
lastReceived: func() atomic.Value {
value := atomic.Value{}
value.Store(time.Time{})
@@ -446,19 +445,19 @@ func Test_executor_String(t *testing.T) {
traceWorkers: true,
},
want: `
-╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗ ║
-║║b1 true ║║ b1 true ║ ║
-║╚════════╝╚═════════╝ ║
-║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║
-║║║0x0000000000000001 1║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║
-║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║
-║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
-║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
-║╚════════════════════╝╚════════════╝╚═════════════╝ ║
-╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗ ║
+║║b1 true ║║ b1 true ║ ║
+║╚════════╝╚═════════╝ ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
+║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
+║╚════════════════════╝╚════════════╝╚═════════════╝ ║
+╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 0fa5ae597b..7b1308de73 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
package pool
import (
+ "github.com/rs/zerolog/log"
"runtime/debug"
"sync"
"sync/atomic"
@@ -30,18 +31,21 @@ import (
//go:generate go run ../../tools/plc4xgenerator/gen.go -type=worker
type worker struct {
- id int
- shutdown atomic.Bool
- interrupted atomic.Bool
- interrupter chan struct{}
- executor interface {
+ id int
+ executor interface {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
}
- hasEnded atomic.Bool
+
lastReceived atomic.Value
+ stateChange sync.Mutex
+ running atomic.Bool
+ shutdown atomic.Bool
+ interrupted atomic.Bool
+ interrupter chan struct{}
+
log zerolog.Logger `ignore:"true"`
}
@@ -49,32 +53,57 @@ func (w *worker) initialize() {
w.shutdown.Store(false)
w.interrupted.Store(false)
w.interrupter = make(chan struct{}, 1)
- w.hasEnded.Store(false)
w.lastReceived.Store(time.Now())
}
-func (w *worker) work() {
+func (w *worker) start() {
+ w.stateChange.Lock()
+ defer w.stateChange.Unlock()
+ if w.running.Load() {
+ log.Warn().Msg("Worker already started")
+ return
+ }
+ if w.executor.isTraceWorkers() {
+ w.log.Debug().Msgf("Starting worker\n%s", w)
+ }
w.executor.getWorkerWaitGroup().Add(1)
+ go w.work()
+}
+
+func (w *worker) stop(interrupt bool) {
+ w.stateChange.Lock()
+ defer w.stateChange.Unlock()
+ if !w.running.Load() {
+ log.Warn().Msg("Worker not running")
+ return
+ }
+ w.shutdown.Store(true)
+ if interrupt {
+ w.interrupted.Store(true)
+ close(w.interrupter)
+ }
+}
+
+func (w *worker) work() {
defer w.executor.getWorkerWaitGroup().Done()
defer func() {
if err := recover(); err != nil {
w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- if !w.shutdown.Load() {
- // if we are not in shutdown we continue
- w.work()
+ if !w.shutdown.Load() {
+ // if we are not in shutdown we continue
+ w.start()
+ }
}
}()
+ w.running.Store(true)
+ defer w.running.Store(false)
workerLog := w.log.With().Int("Worker id", w.id).Logger()
if !w.executor.isTraceWorkers() {
workerLog = zerolog.Nop()
}
- workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
- w.hasEnded.Store(false)
- workerLog.Debug().Msgf("setting to not ended")
for !w.shutdown.Load() {
- workerLog.Debug().Msg("Working")
+ workerLog.Trace().Msg("Working")
select {
case _workItem := <-w.executor.getWorksItems():
w.lastReceived.Store(time.Now())
@@ -92,6 +121,5 @@ func (w *worker) work() {
workerLog.Debug().Msg("We got interrupted")
}
}
- w.hasEnded.Store(true)
- workerLog.Debug().Msg("setting to ended")
+ workerLog.Trace().Msg("done")
}
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 23e8d1689c..9f132d3036 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -47,23 +47,6 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
return err
}
- if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
- return err
- }
-
- if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
- return err
- }
-
- _interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
- if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
- return err
- }
-
- if err := writeBuffer.WriteBit("hasEnded", d.hasEnded.Load()); err != nil {
- return err
- }
-
if d.lastReceived.Load() != nil {
if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
if err := writeBuffer.PushContext("lastReceived"); err != nil {
@@ -82,6 +65,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
}
}
}
+
+ if err := writeBuffer.WriteBit("running", d.running.Load()); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
+ return err
+ }
+
+ _interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
+ if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
+ return err
+ }
if err := writeBuffer.PopContext("worker"); err != nil {
return err
}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 120b9548dc..d781466b43 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,7 +20,9 @@
package pool
import (
+ "github.com/rs/zerolog"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -58,6 +60,118 @@ func Test_worker_initialize(t *testing.T) {
}
}
+func Test_worker_start(t *testing.T) {
+ type fields struct {
+ id int
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived atomic.Value
+ interrupter chan struct{}
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ manipulator func(t *testing.T, worker *worker)
+ }{
+ {
+ name: "start it",
+ fields: fields{
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ // No-op
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ },
+ {
+ name: "start started",
+ manipulator: func(t *testing.T, worker *worker) {
+ worker.running.Store(true)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ interrupter: tt.fields.interrupter,
+ log: tt.fields.log,
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, w)
+ }
+ w.start()
+ t.Cleanup(func() {
+ w.stop(false)
+ })
+ })
+ }
+}
+
+func Test_worker_stop(t *testing.T) {
+ type fields struct {
+ id int
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived atomic.Value
+ interrupter chan struct{}
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ manipulator func(t *testing.T, worker *worker)
+ }{
+ {
+ name: "stop it",
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ interrupter: make(chan struct{}),
+ },
+ manipulator: func(t *testing.T, worker *worker) {
+ worker.running.Store(true)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ interrupter: tt.fields.interrupter,
+ log: tt.fields.log,
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, w)
+ }
+ w.stop(true)
+ })
+ }
+}
+
func Test_worker_work(t *testing.T) {
type fields struct {
id int
@@ -96,7 +210,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
@@ -104,7 +218,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -130,14 +244,14 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -154,7 +268,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
@@ -162,7 +276,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -203,6 +317,7 @@ func Test_worker_work(t *testing.T) {
executor: tt.fields.executor,
log: produceTestingLogger(t),
}
+ w.executor.getWorkerWaitGroup().Add(1)
go w.work()
if tt.firstValidation != nil {
time.Sleep(tt.timeBeforeFirstValidation)
@@ -238,11 +353,11 @@ func Test_worker_String(t *testing.T) {
{
name: "string it",
want: `
-╔═worker════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║
-║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║
-║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═worker═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║
+║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║
+║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index d858749a97..63041083ea 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -598,29 +598,29 @@ func Test_requestTransactionManager_String(t *testing.T) {
traceTransactionManagerTransactions: true,
},
want: `
-╔═requestTransactionManager════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║
-║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║
-║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║
-║║ ╚══════════════╝╚══════════╝ ║ ║
-║╚═════════════════════════════════════════╝ ║
-║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗ ║║b0 false ║║
-║║║b0 false║║b0 false ║ ║╚═════════╝║
-║║╚════════╝╚═════════╝ ║ ║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║ ║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║ ║
-║║║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║ ║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║ ║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
-║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
-║╔═traceTransactionManagerTransactions╗ ║
-║║ b1 true ║ ║
-║╚════════════════════════════════════╝ ║
-╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║
+║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║
+║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║
+║║ ╚══════════════╝╚══════════╝ ║ ║
+║╚═════════════════════════════════════════╝ ║
+║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
+║║╔═running╗╔═shutdown╗ ║║b0 false ║║
+║║║b0 false║║b0 false ║ ║╚═════════╝║
+║║╚════════╝╚═════════╝ ║ ║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║ ║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║ ║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
+║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
+║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
+║╔═traceTransactionManagerTransactions╗ ║
+║║ b1 true ║ ║
+║╚════════════════════════════════════╝ ║
+╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {