You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 13:50:55 UTC

[plc4x] branch develop updated (6dac19d75e -> 0458529ee1)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 6dac19d75e test(plc4go/cbus): shutdown workers of discoverer once done
     new 5e51e6606d fix(plc4go/spi): fix race issues in worker pool
     new 915b6b5408 feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator
     new 7089ac3b58 fix(plc4go/tools): fix atomic.Pointer support
     new dcf630aa84 fix(plc4go/spi): fix worker logging on wrong logger
     new 0458529ee1 fix(plc4go/spi): fix race issues in request transaction

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Browser_test.go               |   3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go     |   2 +-
 plc4go/spi/pool/dynamicExecutor.go                 |   5 +-
 plc4go/spi/pool/dynamicExecutor_test.go            |  36 ++---
 plc4go/spi/pool/executor.go                        |  11 +-
 plc4go/spi/pool/executor_test.go                   |  27 ++--
 plc4go/spi/pool/worker.go                          |  64 ++++++---
 plc4go/spi/pool/worker_plc4xgen.go                 |  34 ++---
 plc4go/spi/pool/worker_test.go                     | 137 ++++++++++++++++--
 plc4go/spi/transactions/RequestTransaction.go      |  25 +++-
 .../spi/transactions/RequestTransactionManager.go  |   6 +-
 .../transactions/RequestTransactionManager_test.go |  72 +++++-----
 plc4go/spi/transactions/RequestTransaction_test.go | 154 +++++++++++----------
 .../transactions/requestTransaction_plc4xgen.go    |   7 +-
 plc4go/tools/plc4xgenerator/gen.go                 |  34 +++++
 15 files changed, 406 insertions(+), 211 deletions(-)


[plc4x] 03/05: fix(plc4go/tools): fix atomic.Pointer support

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7089ac3b584ba7fd776905c8ed29d9eaa4a59eb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:45:00 2023 +0200

    fix(plc4go/tools): fix atomic.Pointer support
---
 plc4go/tools/plc4xgenerator/gen.go | 24 +++++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git a/plc4go/tools/plc4xgenerator/gen.go b/plc4go/tools/plc4xgenerator/gen.go
index 3644e4ceca..54ceab6101 100644
--- a/plc4go/tools/plc4xgenerator/gen.go
+++ b/plc4go/tools/plc4xgenerator/gen.go
@@ -284,7 +284,7 @@ func (g *Generator) generate(typeName string) {
 				xIdent, xIsIdent := fieldType.X.(*ast.Ident)
 				sel := fieldType.Sel
 				if xIsIdent && xIdent.Name == "atomic" && sel.Name == "Pointer" {
-					g.Printf(serializableFieldTemplate, "(*d."+field.name+".Load())", fieldNameUntitled)
+					g.Printf(atomicPointerFieldTemplate, "d."+field.name, field.name, fieldNameUntitled)
 					continue
 				}
 			}
@@ -556,6 +556,28 @@ var serializableFieldTemplate = `
 	}
 `
 
+var atomicPointerFieldTemplate = `
+	if %[2]sLoaded :=%[1]s.Load(); %[2]sLoaded != nil && *%[2]sLoaded != nil {
+		%[2]s := *%[2]sLoaded
+		if serializableField, ok := %[2]s.(utils.Serializable); ok {
+			if err := writeBuffer.PushContext(%[3]s); err != nil {
+				return err
+			}
+			if err := serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+				return err
+			}
+			if err := writeBuffer.PopContext(%[3]s); err != nil {
+				return err
+			}
+		} else {
+			stringValue := fmt.Sprintf("%%v", %[2]s)
+			if err := writeBuffer.WriteString(%[3]s, uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
+				return err
+			}
+		}
+	}
+`
+
 var byteFieldSerialize = `
 	if err := writeBuffer.WriteByte(%[2]s, %[1]s); err != nil {
 		return err


[plc4x] 04/05: fix(plc4go/spi): fix worker logging on wrong logger

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit dcf630aa840b03e44549d88395535b5fbe613d6b
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:48:41 2023 +0200

    fix(plc4go/spi): fix worker logging on wrong logger
---
 plc4go/spi/pool/worker.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 7b1308de73..6017f6c78c 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -74,7 +74,7 @@ func (w *worker) stop(interrupt bool) {
 	w.stateChange.Lock()
 	defer w.stateChange.Unlock()
 	if !w.running.Load() {
-		log.Warn().Msg("Worker not running")
+		w.log.Warn().Msg("Worker not running")
 		return
 	}
 	w.shutdown.Store(true)


[plc4x] 02/05: feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 915b6b54089bcd157ac9103f460b85dd1e0a241d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:28:07 2023 +0200

    feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator
---
 plc4go/tools/plc4xgenerator/gen.go | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/plc4go/tools/plc4xgenerator/gen.go b/plc4go/tools/plc4xgenerator/gen.go
index 1e4fe23f60..3644e4ceca 100644
--- a/plc4go/tools/plc4xgenerator/gen.go
+++ b/plc4go/tools/plc4xgenerator/gen.go
@@ -278,6 +278,18 @@ func (g *Generator) generate(typeName string) {
 				}
 			}
 			g.Printf(serializableFieldTemplate, "d."+field.name, fieldNameUntitled)
+		case *ast.IndexExpr:
+			x := fieldType.X
+			if fieldType, isxFieldSelector := x.(*ast.SelectorExpr); isxFieldSelector { // TODO: we need to refactor this so we can reuse...
+				xIdent, xIsIdent := fieldType.X.(*ast.Ident)
+				sel := fieldType.Sel
+				if xIsIdent && xIdent.Name == "atomic" && sel.Name == "Pointer" {
+					g.Printf(serializableFieldTemplate, "(*d."+field.name+".Load())", fieldNameUntitled)
+					continue
+				}
+			}
+			fmt.Printf("no support yet for %#q\n", fieldType)
+			continue
 		case *ast.Ident:
 			switch fieldType.Name {
 			case "byte":


[plc4x] 01/05: fix(plc4go/spi): fix race issues in worker pool

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5e51e6606dc9fe58d8324a5e2e102e0c4537aeb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 14:18:31 2023 +0200

    fix(plc4go/spi): fix race issues in worker pool
---
 plc4go/internal/cbus/Browser_test.go               |   3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go     |   2 +-
 plc4go/spi/pool/dynamicExecutor.go                 |   5 +-
 plc4go/spi/pool/dynamicExecutor_test.go            |  36 +++---
 plc4go/spi/pool/executor.go                        |  11 +-
 plc4go/spi/pool/executor_test.go                   |  27 ++--
 plc4go/spi/pool/worker.go                          |  64 +++++++---
 plc4go/spi/pool/worker_plc4xgen.go                 |  34 ++---
 plc4go/spi/pool/worker_test.go                     | 137 +++++++++++++++++++--
 .../transactions/RequestTransactionManager_test.go |  46 +++----
 10 files changed, 251 insertions(+), 114 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 501eb7371a..3fef9b7ee8 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -162,8 +162,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 				})
 				connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
 				if err := connectionConnectResult.GetErr(); err != nil {
-					t.Error(err)
-					t.FailNow()
+					t.Fatal(err)
 				}
 				fields.connection = connectionConnectResult.GetConnection()
 				t.Cleanup(func() {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 1bacc4775b..588ba63d81 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1479,7 +1479,7 @@ func TestMapEncodedReply(t *testing.T) {
 					assert.NoError(t, transactionManager.Close())
 				})
 				transaction := transactionManager.StartTransaction()
-				t.Logf("Submitting No-Op to transaction %v", transaction)
+				t.Logf("Submitting No-Op to transaction\n%v", transaction)
 				transaction.Submit(func(transaction transactions.RequestTransaction) {
 					// NO-OP
 				})
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 7a1b7d5381..e2cf3afb56 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -91,7 +91,7 @@ func (e *dynamicExecutor) Start() {
 				e.worker = append(e.worker, _worker)
 				_worker.initialize()
 				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
-				go _worker.work()
+				_worker.start()
 				e.currentNumberOfWorkers.Add(1)
 			} else {
 				workerLog.Trace().Msg("Nothing to scale")
@@ -133,8 +133,7 @@ func (e *dynamicExecutor) Start() {
 				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
 				if _worker.lastReceived.Load().(time.Time).Before(deadline) {
 					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
-					_worker.interrupted.Store(true)
-					close(_worker.interrupter)
+					_worker.stop(true)
 					e.currentNumberOfWorkers.Add(-1)
 				} else {
 					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 107486eb52..e58ffd0f13 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -198,24 +198,24 @@ func Test_dynamicExecutor_String(t *testing.T) {
 				maxNumberOfWorkers: 3,
 			},
 			want: `
-╔═dynamicExecutor═══════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║
-║║║b0 false║║b0 false ║                                                                                    ║║
-║║╚════════╝╚═════════╝                                                                                    ║║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║║
-║║║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║║
-║║║0x0000000000000000 0║║0 element(s)║║  b0 false   ║                                                      ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗                                              ║
-║║0x0000000000000003 3║║     0x00000000 0      ║║0 element(s)║                                              ║
-║╚════════════════════╝╚═══════════════════════╝╚════════════╝                                              ║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═dynamicExecutor══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═running╗╔═shutdown╗                                                                                   ║║
+║║║b0 false║║b0 false ║                                                                                   ║║
+║║╚════════╝╚═════════╝                                                                                   ║║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║║
+║║║0x0000000000000000 0║║0 element(s)║║  b0 false   ║                                                     ║║
+║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗                                             ║
+║║0x0000000000000003 3║║     0x00000000 0      ║║0 element(s)║                                             ║
+║╚════════════════════╝╚═══════════════════════╝╚════════════╝                                             ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 654edf2973..794bfb4221 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -93,9 +93,9 @@ func (e *executor) Start() {
 	e.running = true
 	e.shutdown = false
 	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.initialize()
-		go worker.work()
+		_worker := e.worker[i]
+		_worker.initialize()
+		_worker.start()
 	}
 }
 
@@ -109,10 +109,7 @@ func (e *executor) Stop() {
 	}
 	e.shutdown = true
 	for i := 0; i < len(e.worker); i++ {
-		worker := e.worker[i]
-		worker.shutdown.Store(true)
-		worker.interrupted.Store(true)
-		close(worker.interrupter)
+		e.worker[i].stop(true)
 	}
 	e.running = false
 	e.shutdown = false
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index ee126bb989..15ecac74ca 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -434,7 +434,6 @@ func Test_executor_String(t *testing.T) {
 						id:          1,
 						shutdown:    atomic.Bool{},
 						interrupted: atomic.Bool{},
-						hasEnded:    atomic.Bool{},
 						lastReceived: func() atomic.Value {
 							value := atomic.Value{}
 							value.Store(time.Time{})
@@ -446,19 +445,19 @@ func Test_executor_String(t *testing.T) {
 				traceWorkers: true,
 			},
 			want: `
-╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗                                                                                    ║
-║║b1 true ║║ b1 true ║                                                                                    ║
-║╚════════╝╚═════════╝                                                                                    ║
-║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║
-║║║0x0000000000000001 1║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║
-║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║
-║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║
-║║0x0000000000000002 2║║0 element(s)║║   b1 true   ║                                                      ║
-║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║
-╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗                                                                                   ║
+║║b1 true ║║ b1 true ║                                                                                   ║
+║╚════════╝╚═════════╝                                                                                   ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║
+║║0x0000000000000002 2║║0 element(s)║║   b1 true   ║                                                     ║
+║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║
+╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 0fa5ae597b..7b1308de73 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
 package pool
 
 import (
+	"github.com/rs/zerolog/log"
 	"runtime/debug"
 	"sync"
 	"sync/atomic"
@@ -30,18 +31,21 @@ import (
 
 //go:generate go run ../../tools/plc4xgenerator/gen.go -type=worker
 type worker struct {
-	id          int
-	shutdown    atomic.Bool
-	interrupted atomic.Bool
-	interrupter chan struct{}
-	executor    interface {
+	id       int
+	executor interface {
 		isTraceWorkers() bool
 		getWorksItems() chan workItem
 		getWorkerWaitGroup() *sync.WaitGroup
 	}
-	hasEnded     atomic.Bool
+
 	lastReceived atomic.Value
 
+	stateChange sync.Mutex
+	running     atomic.Bool
+	shutdown    atomic.Bool
+	interrupted atomic.Bool
+	interrupter chan struct{}
+
 	log zerolog.Logger `ignore:"true"`
 }
 
@@ -49,32 +53,57 @@ func (w *worker) initialize() {
 	w.shutdown.Store(false)
 	w.interrupted.Store(false)
 	w.interrupter = make(chan struct{}, 1)
-	w.hasEnded.Store(false)
 	w.lastReceived.Store(time.Now())
 }
 
-func (w *worker) work() {
+func (w *worker) start() {
+	w.stateChange.Lock()
+	defer w.stateChange.Unlock()
+	if w.running.Load() {
+		log.Warn().Msg("Worker already started")
+		return
+	}
+	if w.executor.isTraceWorkers() {
+		w.log.Debug().Msgf("Starting worker\n%s", w)
+	}
 	w.executor.getWorkerWaitGroup().Add(1)
+	go w.work()
+}
+
+func (w *worker) stop(interrupt bool) {
+	w.stateChange.Lock()
+	defer w.stateChange.Unlock()
+	if !w.running.Load() {
+		log.Warn().Msg("Worker not running")
+		return
+	}
+	w.shutdown.Store(true)
+	if interrupt {
+		w.interrupted.Store(true)
+		close(w.interrupter)
+	}
+}
+
+func (w *worker) work() {
 	defer w.executor.getWorkerWaitGroup().Done()
 	defer func() {
 		if err := recover(); err != nil {
 			w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
-		}
-		if !w.shutdown.Load() {
-			// if we are not in shutdown we continue
-			w.work()
+			if !w.shutdown.Load() {
+				// if we are not in shutdown we continue
+				w.start()
+			}
 		}
 	}()
+	w.running.Store(true)
+	defer w.running.Store(false)
 	workerLog := w.log.With().Int("Worker id", w.id).Logger()
 	if !w.executor.isTraceWorkers() {
 		workerLog = zerolog.Nop()
 	}
-	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
-	w.hasEnded.Store(false)
-	workerLog.Debug().Msgf("setting to not ended")
 
 	for !w.shutdown.Load() {
-		workerLog.Debug().Msg("Working")
+		workerLog.Trace().Msg("Working")
 		select {
 		case _workItem := <-w.executor.getWorksItems():
 			w.lastReceived.Store(time.Now())
@@ -92,6 +121,5 @@ func (w *worker) work() {
 			workerLog.Debug().Msg("We got interrupted")
 		}
 	}
-	w.hasEnded.Store(true)
-	workerLog.Debug().Msg("setting to ended")
+	workerLog.Trace().Msg("done")
 }
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 23e8d1689c..9f132d3036 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -47,23 +47,6 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
 		return err
 	}
 
-	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
-		return err
-	}
-
-	if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
-		return err
-	}
-
-	_interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
-	if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
-		return err
-	}
-
-	if err := writeBuffer.WriteBit("hasEnded", d.hasEnded.Load()); err != nil {
-		return err
-	}
-
 	if d.lastReceived.Load() != nil {
 		if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
 			if err := writeBuffer.PushContext("lastReceived"); err != nil {
@@ -82,6 +65,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
 			}
 		}
 	}
+
+	if err := writeBuffer.WriteBit("running", d.running.Load()); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
+		return err
+	}
+
+	if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
+		return err
+	}
+
+	_interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
+	if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
+		return err
+	}
 	if err := writeBuffer.PopContext("worker"); err != nil {
 		return err
 	}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 120b9548dc..d781466b43 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,7 +20,9 @@
 package pool
 
 import (
+	"github.com/rs/zerolog"
 	"sync"
+	"sync/atomic"
 	"testing"
 	"time"
 
@@ -58,6 +60,118 @@ func Test_worker_initialize(t *testing.T) {
 	}
 }
 
+func Test_worker_start(t *testing.T) {
+	type fields struct {
+		id       int
+		executor interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived atomic.Value
+		interrupter  chan struct{}
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, worker *worker)
+	}{
+		{
+			name: "start it",
+			fields: fields{
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								// No-op
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
+			},
+		},
+		{
+			name: "start started",
+			manipulator: func(t *testing.T, worker *worker) {
+				worker.running.Store(true)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				interrupter:  tt.fields.interrupter,
+				log:          tt.fields.log,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, w)
+			}
+			w.start()
+			t.Cleanup(func() {
+				w.stop(false)
+			})
+		})
+	}
+}
+
+func Test_worker_stop(t *testing.T) {
+	type fields struct {
+		id       int
+		executor interface {
+			isTraceWorkers() bool
+			getWorksItems() chan workItem
+			getWorkerWaitGroup() *sync.WaitGroup
+		}
+		lastReceived atomic.Value
+		interrupter  chan struct{}
+		log          zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, worker *worker)
+	}{
+		{
+			name: "stop it",
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				interrupter: make(chan struct{}),
+			},
+			manipulator: func(t *testing.T, worker *worker) {
+				worker.running.Store(true)
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			w := &worker{
+				id:           tt.fields.id,
+				executor:     tt.fields.executor,
+				lastReceived: tt.fields.lastReceived,
+				interrupter:  tt.fields.interrupter,
+				log:          tt.fields.log,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, w)
+			}
+			w.stop(true)
+		})
+	}
+}
+
 func Test_worker_work(t *testing.T) {
 	type fields struct {
 		id       int
@@ -96,7 +210,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
@@ -104,7 +218,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -130,14 +244,14 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -154,7 +268,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded.Load(), "should not be ended")
+				assert.True(t, w.running.Load(), "should be running")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
@@ -162,7 +276,7 @@ func Test_worker_work(t *testing.T) {
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded.Load(), "should be ended")
+				assert.False(t, w.running.Load(), "should not be running")
 			},
 		},
 		{
@@ -203,6 +317,7 @@ func Test_worker_work(t *testing.T) {
 				executor:    tt.fields.executor,
 				log:         produceTestingLogger(t),
 			}
+			w.executor.getWorkerWaitGroup().Add(1)
 			go w.work()
 			if tt.firstValidation != nil {
 				time.Sleep(tt.timeBeforeFirstValidation)
@@ -238,11 +353,11 @@ func Test_worker_String(t *testing.T) {
 		{
 			name: "string it",
 			want: `
-╔═worker════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║
-║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║
-║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═worker═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║
+║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║
+║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index d858749a97..63041083ea 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -598,29 +598,29 @@ func Test_requestTransactionManager_String(t *testing.T) {
 				traceTransactionManagerTransactions: true,
 			},
 			want: `
-╔═requestTransactionManager════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗                       ║
-║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    0x00000004 4     ║                       ║
-║║      ║ 0x00000002 2 ║║ b0 false ║       ║╚═══════════════════════════╝╚═════════════════════╝                       ║
-║║      ╚══════════════╝╚══════════╝       ║                                                                           ║
-║╚═════════════════════════════════════════╝                                                                           ║
-║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║b0 false ║║
-║║║b0 false║║b0 false ║                                                                                    ║╚═════════╝║
-║║╚════════╝╚═════════╝                                                                                    ║           ║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║           ║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║           ║
-║║║║0x0000000000000000 0║║b0 false ║║  b0 false  ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║           ║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║           ║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║           ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                      ║           ║
-║║║0x0000000000000001 1║║0 element(s)║║  b0 false   ║                                                      ║           ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                      ║           ║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝           ║
-║╔═traceTransactionManagerTransactions╗                                                                                ║
-║║              b1 true               ║                                                                                ║
-║╚════════════════════════════════════╝                                                                                ║
-╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗                      ║
+║║      ╔═transactionId╗╔═completed╗       ║║   0x0000000000000003 3    ║║    0x00000004 4     ║                      ║
+║║      ║ 0x00000002 2 ║║ b0 false ║       ║╚═══════════════════════════╝╚═════════════════════╝                      ║
+║║      ╚══════════════╝╚══════════╝       ║                                                                          ║
+║╚═════════════════════════════════════════╝                                                                          ║
+║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
+║║╔═running╗╔═shutdown╗                                                                                   ║║b0 false ║║
+║║║b0 false║║b0 false ║                                                                                   ║╚═════════╝║
+║║╚════════╝╚═════════╝                                                                                   ║           ║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║           ║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║           ║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║           ║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║           ║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║           ║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║           ║
+║║║0x0000000000000001 1║║0 element(s)║║  b0 false   ║                                                     ║           ║
+║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║           ║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝           ║
+║╔═traceTransactionManagerTransactions╗                                                                               ║
+║║              b1 true               ║                                                                               ║
+║╚════════════════════════════════════╝                                                                               ║
+╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {


[plc4x] 05/05: fix(plc4go/spi): fix race issues in request transaction

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 0458529ee1acfda55ae7ffaf75d6c95045a06fea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:49:48 2023 +0200

    fix(plc4go/spi): fix race issues in request transaction
---
 plc4go/spi/transactions/RequestTransaction.go      |  25 +++-
 .../spi/transactions/RequestTransactionManager.go  |   6 +-
 .../transactions/RequestTransactionManager_test.go |  26 ++--
 plc4go/spi/transactions/RequestTransaction_test.go | 154 +++++++++++----------
 .../transactions/requestTransaction_plc4xgen.go    |   7 +-
 5 files changed, 121 insertions(+), 97 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
index 18f88c7293..760a0d6a4c 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -22,12 +22,15 @@ package transactions
 import (
 	"context"
 	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+
 	"github.com/apache/plc4x/plc4go/spi/pool"
+
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
-	"sync"
-	"time"
 )
 
 // RequestTransaction represents a transaction
@@ -58,7 +61,7 @@ type requestTransaction struct {
 
 	/** The initial operation to perform to kick off the request */
 	operation        pool.Runnable `ignore:"true"` // TODO: maybe we can treat this as a function some day if we are able to check the definition in gen
-	completionFuture pool.CompletionFuture
+	completionFuture atomic.Pointer[pool.CompletionFuture]
 
 	stateChangeMutex sync.Mutex
 	completed        bool
@@ -66,6 +69,18 @@ type requestTransaction struct {
 	transactionLog zerolog.Logger `ignore:"true"`
 }
 
+func (t *requestTransaction) setCompletionFuture(completionFuture pool.CompletionFuture) {
+	t.completionFuture.Store(&completionFuture)
+}
+
+func (t *requestTransaction) getCompletionFuture() pool.CompletionFuture {
+	completionFutureLoaded := t.completionFuture.Load()
+	if completionFutureLoaded == nil {
+		return nil
+	}
+	return *completionFutureLoaded
+}
+
 //
 // Internal section
 //
@@ -118,14 +133,14 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
 	t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Awaiting completion")
 	timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
 	defer cancelFunc()
-	for t.completionFuture == nil {
+	for t.getCompletionFuture() == nil {
 		time.Sleep(time.Millisecond * 10)
 		if err := timeout.Err(); err != nil {
 			log.Error().Msg("Timout after a long time. This means something is very of here")
 			return errors.Wrap(err, "Error waiting for completion future to be set")
 		}
 	}
-	if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+	if err := t.getCompletionFuture().AwaitCompletion(ctx); err != nil {
 		t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Errored")
 		return err
 	}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 83551e0805..9fc80d6f4d 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -169,7 +169,7 @@ func (r *requestTransactionManager) processWorklog() {
 		r.log.Debug().Msgf("Handling next\n%v\n. (Adding to running requests (length: %d))", next, len(r.runningRequests))
 		r.runningRequests = append(r.runningRequests, next)
 		completionFuture := r.executor.Submit(context.Background(), next.transactionId, next.operation)
-		next.completionFuture = completionFuture
+		next.setCompletionFuture(completionFuture)
 		r.workLog.Remove(front)
 	}
 }
@@ -190,7 +190,7 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 	}
 	if r.shutdown.Load() {
 		transaction.completed = true
-		transaction.completionFuture = &completedFuture{errors.New("request transaction manager in shutdown")}
+		transaction.setCompletionFuture(&completedFuture{errors.New("request transaction manager in shutdown")})
 	}
 	return transaction
 }
@@ -203,7 +203,7 @@ func (r *requestTransactionManager) getNumberOfActiveRequests() int {
 
 func (r *requestTransactionManager) failRequest(transaction *requestTransaction, err error) error {
 	// Try to fail it!
-	transaction.completionFuture.Cancel(true, err)
+	transaction.getCompletionFuture().Cancel(true, err)
 	// End it
 	return r.endRequest(transaction)
 }
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 63041083ea..b80299c202 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -280,10 +280,11 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transaction: &requestTransaction{},
 			},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				completionFuture := NewMockCompletionFuture(t)
-				expect := completionFuture.EXPECT()
+				completionFutureMock := NewMockCompletionFuture(t)
+				expect := completionFutureMock.EXPECT()
 				expect.Cancel(true, nil).Return()
-				args.transaction.completionFuture = completionFuture
+				var completionFuture pool.CompletionFuture = completionFutureMock
+				args.transaction.completionFuture.Store(&completionFuture)
 			},
 			wantErr: true,
 		},
@@ -374,14 +375,17 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
 				numberOfConcurrentRequests: 100,
 				workLog: func() list.List {
 					l := list.New()
-					l.PushBack(&requestTransaction{
-						transactionId:    1,
-						completionFuture: NewMockCompletionFuture(t),
-					})
-					l.PushBack(&requestTransaction{
-						transactionId:    2,
-						completionFuture: NewMockCompletionFuture(t),
-					})
+					var completionFuture pool.CompletionFuture = NewMockCompletionFuture(t)
+					r1 := &requestTransaction{
+						transactionId: 1,
+					}
+					r1.completionFuture.Store(&completionFuture)
+					l.PushBack(r1)
+					r2 := &requestTransaction{
+						transactionId: 2,
+					}
+					r2.completionFuture.Store(&completionFuture)
+					l.PushBack(r2)
 					return *l
 				}(),
 				executor: sharedExecutorInstance,
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go b/plc4go/spi/transactions/RequestTransaction_test.go
index 4288d703ba..00c68fe1f9 100644
--- a/plc4go/spi/transactions/RequestTransaction_test.go
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -35,11 +35,10 @@ import (
 
 func Test_requestTransaction_EndRequest(t1 *testing.T) {
 	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		completed        bool
+		parent        *requestTransactionManager
+		transactionId int32
+		operation     pool.Runnable
+		completed     bool
 	}
 	tests := []struct {
 		name    string
@@ -65,12 +64,11 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
 			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   testutils.ProduceTestingLogger(t1),
-				completed:        tt.fields.completed,
+				parent:         tt.fields.parent,
+				transactionId:  tt.fields.transactionId,
+				operation:      tt.fields.operation,
+				transactionLog: testutils.ProduceTestingLogger(t1),
+				completed:      tt.fields.completed,
 			}
 			if err := t.EndRequest(); (err != nil) != tt.wantErr {
 				t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -81,33 +79,34 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 
 func Test_requestTransaction_FailRequest(t1 *testing.T) {
 	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-		completed        bool
+		parent         *requestTransactionManager
+		transactionId  int32
+		operation      pool.Runnable
+		transactionLog zerolog.Logger
+		completed      bool
 	}
 	type args struct {
 		err error
 	}
 	tests := []struct {
-		name      string
-		fields    fields
-		args      args
-		mockSetup func(t *testing.T, fields *fields, args *args)
-		wantErr   assert.ErrorAssertionFunc
+		name        string
+		fields      fields
+		args        args
+		mockSetup   func(t *testing.T, fields *fields, args *args)
+		manipulator func(t *testing.T, transaction *requestTransaction)
+		wantErr     assert.ErrorAssertionFunc
 	}{
 		{
 			name: "just fail it",
 			fields: fields{
 				parent: &requestTransactionManager{},
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				completionFuture := NewMockCompletionFuture(t)
-				expect := completionFuture.EXPECT()
+			manipulator: func(t *testing.T, transaction *requestTransaction) {
+				completionFutureMock := NewMockCompletionFuture(t)
+				expect := completionFutureMock.EXPECT()
 				expect.Cancel(true, nil).Return()
-				fields.completionFuture = completionFuture
+				var completionFuture pool.CompletionFuture = completionFutureMock
+				transaction.completionFuture.Store(&completionFuture)
 			},
 			wantErr: assert.Error,
 		},
@@ -129,32 +128,37 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 				tt.mockSetup(t, &tt.fields, &tt.args)
 			}
 			r := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-				completed:        tt.fields.completed,
+				parent:         tt.fields.parent,
+				transactionId:  tt.fields.transactionId,
+				operation:      tt.fields.operation,
+				transactionLog: tt.fields.transactionLog,
+				completed:      tt.fields.completed,
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, r)
 			}
 			tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
 		})
 	}
 }
 
-func Test_requestTransaction_String(t1 *testing.T) {
+func Test_requestTransaction_String(t *testing.T) {
 	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
+		parent        *requestTransactionManager
+		transactionId int32
+		operation     pool.Runnable
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		want   string
+		name        string
+		fields      fields
+		manipulator func(t *testing.T, transaction *requestTransaction)
+		want        string
 	}{
 		{
 			name: "give a string",
+			manipulator: func(t *testing.T, transaction *requestTransaction) {
+				transaction.setCompletionFuture(nil)
+			},
 			want: `
 ╔═requestTransaction═════════╗
 ║╔═transactionId╗╔═completed╗║
@@ -164,15 +168,17 @@ func Test_requestTransaction_String(t1 *testing.T) {
 		},
 	}
 	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
-			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   testutils.ProduceTestingLogger(t1),
+		t.Run(tt.name, func(t1 *testing.T) {
+			_t := &requestTransaction{
+				parent:         tt.fields.parent,
+				transactionId:  tt.fields.transactionId,
+				operation:      tt.fields.operation,
+				transactionLog: testutils.ProduceTestingLogger(t1),
 			}
-			if got := t.String(); got != tt.want {
+			if tt.manipulator != nil {
+				tt.manipulator(t, _t)
+			}
+			if got := _t.String(); got != tt.want {
 				t1.Errorf("String() = \n%v, want \n%v", got, tt.want)
 			}
 		})
@@ -181,12 +187,11 @@ func Test_requestTransaction_String(t1 *testing.T) {
 
 func Test_requestTransaction_Submit(t1 *testing.T) {
 	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
-		transactionLog   zerolog.Logger
-		completed        bool
+		parent         *requestTransactionManager
+		transactionId  int32
+		operation      pool.Runnable
+		transactionLog zerolog.Logger
+		completed      bool
 	}
 	type args struct {
 		operation RequestTransactionRunnable
@@ -240,12 +245,11 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
 			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   tt.fields.transactionLog,
-				completed:        tt.fields.completed,
+				parent:         tt.fields.parent,
+				transactionId:  tt.fields.transactionId,
+				operation:      tt.fields.operation,
+				transactionLog: tt.fields.transactionLog,
+				completed:      tt.fields.completed,
 			}
 			t.Submit(tt.args.operation)
 			t.operation()
@@ -255,10 +259,9 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 
 func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
 	type fields struct {
-		parent           *requestTransactionManager
-		transactionId    int32
-		operation        pool.Runnable
-		completionFuture pool.CompletionFuture
+		parent        *requestTransactionManager
+		transactionId int32
+		operation     pool.Runnable
 	}
 	type args struct {
 		ctx context.Context
@@ -285,13 +288,12 @@ func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
 					return ctx
 				}(),
 			},
-			mockSetup: func(t *testing.T, fields *fields, args *args) {
-				completionFuture := NewMockCompletionFuture(t)
-				expect := completionFuture.EXPECT()
-				expect.AwaitCompletion(mock.Anything).Return(nil)
-				fields.completionFuture = completionFuture
-			},
 			manipulator: func(t *testing.T, transaction *requestTransaction) {
+				completionFutureMock := NewMockCompletionFuture(t)
+				expect := completionFutureMock.EXPECT()
+				expect.AwaitCompletion(mock.Anything).Return(nil)
+				var completionFuture pool.CompletionFuture = completionFutureMock
+				transaction.completionFuture.Store(&completionFuture)
 				go func() {
 					time.Sleep(100 * time.Millisecond)
 					r := transaction.parent
@@ -308,11 +310,13 @@ func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
 				tt.mockSetup(t1, &tt.fields, &tt.args)
 			}
 			t := &requestTransaction{
-				parent:           tt.fields.parent,
-				transactionId:    tt.fields.transactionId,
-				operation:        tt.fields.operation,
-				completionFuture: tt.fields.completionFuture,
-				transactionLog:   testutils.ProduceTestingLogger(t1),
+				parent:         tt.fields.parent,
+				transactionId:  tt.fields.transactionId,
+				operation:      tt.fields.operation,
+				transactionLog: testutils.ProduceTestingLogger(t1),
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t1, t)
 			}
 			if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
 				t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transactions/requestTransaction_plc4xgen.go b/plc4go/spi/transactions/requestTransaction_plc4xgen.go
index d6dd6afa75..733c59fde4 100644
--- a/plc4go/spi/transactions/requestTransaction_plc4xgen.go
+++ b/plc4go/spi/transactions/requestTransaction_plc4xgen.go
@@ -47,8 +47,9 @@ func (d *requestTransaction) SerializeWithWriteBuffer(ctx context.Context, write
 		return err
 	}
 
-	if d.completionFuture != nil {
-		if serializableField, ok := d.completionFuture.(utils.Serializable); ok {
+	if completionFutureLoaded := d.completionFuture.Load(); completionFutureLoaded != nil && *completionFutureLoaded != nil {
+		completionFuture := *completionFutureLoaded
+		if serializableField, ok := completionFuture.(utils.Serializable); ok {
 			if err := writeBuffer.PushContext("completionFuture"); err != nil {
 				return err
 			}
@@ -59,7 +60,7 @@ func (d *requestTransaction) SerializeWithWriteBuffer(ctx context.Context, write
 				return err
 			}
 		} else {
-			stringValue := fmt.Sprintf("%v", d.completionFuture)
+			stringValue := fmt.Sprintf("%v", completionFuture)
 			if err := writeBuffer.WriteString("completionFuture", uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
 				return err
 			}