You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 13:50:55 UTC
[plc4x] branch develop updated (6dac19d75e -> 0458529ee1)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
from 6dac19d75e test(plc4go/cbus): shutdown workers of discoverer once done
new 5e51e6606d fix(plc4go/spi): fix race issues in worker pool
new 915b6b5408 feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator
new 7089ac3b58 fix(plc4go/tools): fix atomic.Pointer support
new dcf630aa84 fix(plc4go/spi): fix worker logging on wrong logger
new 0458529ee1 fix(plc4go/spi): fix race issues in request transaction
The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
plc4go/internal/cbus/Browser_test.go | 3 +-
plc4go/internal/cbus/CBusMessageMapper_test.go | 2 +-
plc4go/spi/pool/dynamicExecutor.go | 5 +-
plc4go/spi/pool/dynamicExecutor_test.go | 36 ++---
plc4go/spi/pool/executor.go | 11 +-
plc4go/spi/pool/executor_test.go | 27 ++--
plc4go/spi/pool/worker.go | 64 ++++++---
plc4go/spi/pool/worker_plc4xgen.go | 34 ++---
plc4go/spi/pool/worker_test.go | 137 ++++++++++++++++--
plc4go/spi/transactions/RequestTransaction.go | 25 +++-
.../spi/transactions/RequestTransactionManager.go | 6 +-
.../transactions/RequestTransactionManager_test.go | 72 +++++-----
plc4go/spi/transactions/RequestTransaction_test.go | 154 +++++++++++----------
.../transactions/requestTransaction_plc4xgen.go | 7 +-
plc4go/tools/plc4xgenerator/gen.go | 34 +++++
15 files changed, 406 insertions(+), 211 deletions(-)
[plc4x] 03/05: fix(plc4go/tools): fix atomic.Pointer support
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 7089ac3b584ba7fd776905c8ed29d9eaa4a59eb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:45:00 2023 +0200
fix(plc4go/tools): fix atomic.Pointer support
---
plc4go/tools/plc4xgenerator/gen.go | 24 +++++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git a/plc4go/tools/plc4xgenerator/gen.go b/plc4go/tools/plc4xgenerator/gen.go
index 3644e4ceca..54ceab6101 100644
--- a/plc4go/tools/plc4xgenerator/gen.go
+++ b/plc4go/tools/plc4xgenerator/gen.go
@@ -284,7 +284,7 @@ func (g *Generator) generate(typeName string) {
xIdent, xIsIdent := fieldType.X.(*ast.Ident)
sel := fieldType.Sel
if xIsIdent && xIdent.Name == "atomic" && sel.Name == "Pointer" {
- g.Printf(serializableFieldTemplate, "(*d."+field.name+".Load())", fieldNameUntitled)
+ g.Printf(atomicPointerFieldTemplate, "d."+field.name, field.name, fieldNameUntitled)
continue
}
}
@@ -556,6 +556,28 @@ var serializableFieldTemplate = `
}
`
+var atomicPointerFieldTemplate = `
+ if %[2]sLoaded :=%[1]s.Load(); %[2]sLoaded != nil && *%[2]sLoaded != nil {
+ %[2]s := *%[2]sLoaded
+ if serializableField, ok := %[2]s.(utils.Serializable); ok {
+ if err := writeBuffer.PushContext(%[3]s); err != nil {
+ return err
+ }
+ if err := serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+ return err
+ }
+ if err := writeBuffer.PopContext(%[3]s); err != nil {
+ return err
+ }
+ } else {
+ stringValue := fmt.Sprintf("%%v", %[2]s)
+ if err := writeBuffer.WriteString(%[3]s, uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
+ return err
+ }
+ }
+ }
+`
+
var byteFieldSerialize = `
if err := writeBuffer.WriteByte(%[2]s, %[1]s); err != nil {
return err
[plc4x] 04/05: fix(plc4go/spi): fix worker logging on wrong logger
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit dcf630aa840b03e44549d88395535b5fbe613d6b
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:48:41 2023 +0200
fix(plc4go/spi): fix worker logging on wrong logger
---
plc4go/spi/pool/worker.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 7b1308de73..6017f6c78c 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -74,7 +74,7 @@ func (w *worker) stop(interrupt bool) {
w.stateChange.Lock()
defer w.stateChange.Unlock()
if !w.running.Load() {
- log.Warn().Msg("Worker not running")
+ w.log.Warn().Msg("Worker not running")
return
}
w.shutdown.Store(true)
[plc4x] 02/05: feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 915b6b54089bcd157ac9103f460b85dd1e0a241d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:28:07 2023 +0200
feat(plc4go/tools): add support for atomic.Pointer to plc4xgenerator
---
plc4go/tools/plc4xgenerator/gen.go | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/plc4go/tools/plc4xgenerator/gen.go b/plc4go/tools/plc4xgenerator/gen.go
index 1e4fe23f60..3644e4ceca 100644
--- a/plc4go/tools/plc4xgenerator/gen.go
+++ b/plc4go/tools/plc4xgenerator/gen.go
@@ -278,6 +278,18 @@ func (g *Generator) generate(typeName string) {
}
}
g.Printf(serializableFieldTemplate, "d."+field.name, fieldNameUntitled)
+ case *ast.IndexExpr:
+ x := fieldType.X
+ if fieldType, isxFieldSelector := x.(*ast.SelectorExpr); isxFieldSelector { // TODO: we need to refactor this so we can reuse...
+ xIdent, xIsIdent := fieldType.X.(*ast.Ident)
+ sel := fieldType.Sel
+ if xIsIdent && xIdent.Name == "atomic" && sel.Name == "Pointer" {
+ g.Printf(serializableFieldTemplate, "(*d."+field.name+".Load())", fieldNameUntitled)
+ continue
+ }
+ }
+ fmt.Printf("no support yet for %#q\n", fieldType)
+ continue
case *ast.Ident:
switch fieldType.Name {
case "byte":
[plc4x] 01/05: fix(plc4go/spi): fix race issues in worker pool
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 5e51e6606dc9fe58d8324a5e2e102e0c4537aeb4
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 14:18:31 2023 +0200
fix(plc4go/spi): fix race issues in worker pool
---
plc4go/internal/cbus/Browser_test.go | 3 +-
plc4go/internal/cbus/CBusMessageMapper_test.go | 2 +-
plc4go/spi/pool/dynamicExecutor.go | 5 +-
plc4go/spi/pool/dynamicExecutor_test.go | 36 +++---
plc4go/spi/pool/executor.go | 11 +-
plc4go/spi/pool/executor_test.go | 27 ++--
plc4go/spi/pool/worker.go | 64 +++++++---
plc4go/spi/pool/worker_plc4xgen.go | 34 ++---
plc4go/spi/pool/worker_test.go | 137 +++++++++++++++++++--
.../transactions/RequestTransactionManager_test.go | 46 +++----
10 files changed, 251 insertions(+), 114 deletions(-)
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 501eb7371a..3fef9b7ee8 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -162,8 +162,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
})
connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
if err := connectionConnectResult.GetErr(); err != nil {
- t.Error(err)
- t.FailNow()
+ t.Fatal(err)
}
fields.connection = connectionConnectResult.GetConnection()
t.Cleanup(func() {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 1bacc4775b..588ba63d81 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1479,7 +1479,7 @@ func TestMapEncodedReply(t *testing.T) {
assert.NoError(t, transactionManager.Close())
})
transaction := transactionManager.StartTransaction()
- t.Logf("Submitting No-Op to transaction %v", transaction)
+ t.Logf("Submitting No-Op to transaction\n%v", transaction)
transaction.Submit(func(transaction transactions.RequestTransaction) {
// NO-OP
})
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 7a1b7d5381..e2cf3afb56 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -91,7 +91,7 @@ func (e *dynamicExecutor) Start() {
e.worker = append(e.worker, _worker)
_worker.initialize()
workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
- go _worker.work()
+ _worker.start()
e.currentNumberOfWorkers.Add(1)
} else {
workerLog.Trace().Msg("Nothing to scale")
@@ -133,8 +133,7 @@ func (e *dynamicExecutor) Start() {
workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
if _worker.lastReceived.Load().(time.Time).Before(deadline) {
workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
- _worker.interrupted.Store(true)
- close(_worker.interrupter)
+ _worker.stop(true)
e.currentNumberOfWorkers.Add(-1)
} else {
workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 107486eb52..e58ffd0f13 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -198,24 +198,24 @@ func Test_dynamicExecutor_String(t *testing.T) {
maxNumberOfWorkers: 3,
},
want: `
-╔═dynamicExecutor═══════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═running╗╔═shutdown╗ ║║
-║║║b0 false║║b0 false ║ ║║
-║║╚════════╝╚═════════╝ ║║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║║
-║║║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
-║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
-║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
-║╚════════════════════╝╚═══════════════════════╝╚════════════╝ ║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═dynamicExecutor══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═running╗╔═shutdown╗ ║║
+║║║b0 false║║b0 false ║ ║║
+║║╚════════╝╚═════════╝ ║║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
+║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
+║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
+║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
+║╚════════════════════╝╚═══════════════════════╝╚════════════╝ ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 654edf2973..794bfb4221 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -93,9 +93,9 @@ func (e *executor) Start() {
e.running = true
e.shutdown = false
for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.initialize()
- go worker.work()
+ _worker := e.worker[i]
+ _worker.initialize()
+ _worker.start()
}
}
@@ -109,10 +109,7 @@ func (e *executor) Stop() {
}
e.shutdown = true
for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.shutdown.Store(true)
- worker.interrupted.Store(true)
- close(worker.interrupter)
+ e.worker[i].stop(true)
}
e.running = false
e.shutdown = false
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index ee126bb989..15ecac74ca 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -434,7 +434,6 @@ func Test_executor_String(t *testing.T) {
id: 1,
shutdown: atomic.Bool{},
interrupted: atomic.Bool{},
- hasEnded: atomic.Bool{},
lastReceived: func() atomic.Value {
value := atomic.Value{}
value.Store(time.Time{})
@@ -446,19 +445,19 @@ func Test_executor_String(t *testing.T) {
traceWorkers: true,
},
want: `
-╔═executor════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗ ║
-║║b1 true ║║ b1 true ║ ║
-║╚════════╝╚═════════╝ ║
-║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║
-║║║0x0000000000000001 1║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║
-║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║
-║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
-║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
-║╚════════════════════╝╚════════════╝╚═════════════╝ ║
-╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗ ║
+║║b1 true ║║ b1 true ║ ║
+║╚════════╝╚═════════╝ ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
+║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
+║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
+║╚════════════════════╝╚════════════╝╚═════════════╝ ║
+╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 0fa5ae597b..7b1308de73 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
package pool
import (
+ "github.com/rs/zerolog/log"
"runtime/debug"
"sync"
"sync/atomic"
@@ -30,18 +31,21 @@ import (
//go:generate go run ../../tools/plc4xgenerator/gen.go -type=worker
type worker struct {
- id int
- shutdown atomic.Bool
- interrupted atomic.Bool
- interrupter chan struct{}
- executor interface {
+ id int
+ executor interface {
isTraceWorkers() bool
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
}
- hasEnded atomic.Bool
+
lastReceived atomic.Value
+ stateChange sync.Mutex
+ running atomic.Bool
+ shutdown atomic.Bool
+ interrupted atomic.Bool
+ interrupter chan struct{}
+
log zerolog.Logger `ignore:"true"`
}
@@ -49,32 +53,57 @@ func (w *worker) initialize() {
w.shutdown.Store(false)
w.interrupted.Store(false)
w.interrupter = make(chan struct{}, 1)
- w.hasEnded.Store(false)
w.lastReceived.Store(time.Now())
}
-func (w *worker) work() {
+func (w *worker) start() {
+ w.stateChange.Lock()
+ defer w.stateChange.Unlock()
+ if w.running.Load() {
+ log.Warn().Msg("Worker already started")
+ return
+ }
+ if w.executor.isTraceWorkers() {
+ w.log.Debug().Msgf("Starting worker\n%s", w)
+ }
w.executor.getWorkerWaitGroup().Add(1)
+ go w.work()
+}
+
+func (w *worker) stop(interrupt bool) {
+ w.stateChange.Lock()
+ defer w.stateChange.Unlock()
+ if !w.running.Load() {
+ log.Warn().Msg("Worker not running")
+ return
+ }
+ w.shutdown.Store(true)
+ if interrupt {
+ w.interrupted.Store(true)
+ close(w.interrupter)
+ }
+}
+
+func (w *worker) work() {
defer w.executor.getWorkerWaitGroup().Done()
defer func() {
if err := recover(); err != nil {
w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- if !w.shutdown.Load() {
- // if we are not in shutdown we continue
- w.work()
+ if !w.shutdown.Load() {
+ // if we are not in shutdown we continue
+ w.start()
+ }
}
}()
+ w.running.Store(true)
+ defer w.running.Store(false)
workerLog := w.log.With().Int("Worker id", w.id).Logger()
if !w.executor.isTraceWorkers() {
workerLog = zerolog.Nop()
}
- workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
- w.hasEnded.Store(false)
- workerLog.Debug().Msgf("setting to not ended")
for !w.shutdown.Load() {
- workerLog.Debug().Msg("Working")
+ workerLog.Trace().Msg("Working")
select {
case _workItem := <-w.executor.getWorksItems():
w.lastReceived.Store(time.Now())
@@ -92,6 +121,5 @@ func (w *worker) work() {
workerLog.Debug().Msg("We got interrupted")
}
}
- w.hasEnded.Store(true)
- workerLog.Debug().Msg("setting to ended")
+ workerLog.Trace().Msg("done")
}
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 23e8d1689c..9f132d3036 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -47,23 +47,6 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
return err
}
- if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
- return err
- }
-
- if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
- return err
- }
-
- _interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
- if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
- return err
- }
-
- if err := writeBuffer.WriteBit("hasEnded", d.hasEnded.Load()); err != nil {
- return err
- }
-
if d.lastReceived.Load() != nil {
if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
if err := writeBuffer.PushContext("lastReceived"); err != nil {
@@ -82,6 +65,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
}
}
}
+
+ if err := writeBuffer.WriteBit("running", d.running.Load()); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
+ return err
+ }
+
+ if err := writeBuffer.WriteBit("interrupted", d.interrupted.Load()); err != nil {
+ return err
+ }
+
+ _interrupter_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.interrupter))
+ if err := writeBuffer.WriteString("interrupter", uint32(len(_interrupter_plx4gen_description)*8), "UTF-8", _interrupter_plx4gen_description); err != nil {
+ return err
+ }
if err := writeBuffer.PopContext("worker"); err != nil {
return err
}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 120b9548dc..d781466b43 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -20,7 +20,9 @@
package pool
import (
+ "github.com/rs/zerolog"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -58,6 +60,118 @@ func Test_worker_initialize(t *testing.T) {
}
}
+func Test_worker_start(t *testing.T) {
+ type fields struct {
+ id int
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived atomic.Value
+ interrupter chan struct{}
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ manipulator func(t *testing.T, worker *worker)
+ }{
+ {
+ name: "start it",
+ fields: fields{
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ // No-op
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ },
+ {
+ name: "start started",
+ manipulator: func(t *testing.T, worker *worker) {
+ worker.running.Store(true)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ interrupter: tt.fields.interrupter,
+ log: tt.fields.log,
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, w)
+ }
+ w.start()
+ t.Cleanup(func() {
+ w.stop(false)
+ })
+ })
+ }
+}
+
+func Test_worker_stop(t *testing.T) {
+ type fields struct {
+ id int
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived atomic.Value
+ interrupter chan struct{}
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ manipulator func(t *testing.T, worker *worker)
+ }{
+ {
+ name: "stop it",
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ interrupter: make(chan struct{}),
+ },
+ manipulator: func(t *testing.T, worker *worker) {
+ worker.running.Store(true)
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ interrupter: tt.fields.interrupter,
+ log: tt.fields.log,
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, w)
+ }
+ w.stop(true)
+ })
+ }
+}
+
func Test_worker_work(t *testing.T) {
type fields struct {
id int
@@ -96,7 +210,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
@@ -104,7 +218,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -130,14 +244,14 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -154,7 +268,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
+ assert.True(t, w.running.Load(), "should be running")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
@@ -162,7 +276,7 @@ func Test_worker_work(t *testing.T) {
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
+ assert.False(t, w.running.Load(), "should not be running")
},
},
{
@@ -203,6 +317,7 @@ func Test_worker_work(t *testing.T) {
executor: tt.fields.executor,
log: produceTestingLogger(t),
}
+ w.executor.getWorkerWaitGroup().Add(1)
go w.work()
if tt.firstValidation != nil {
time.Sleep(tt.timeBeforeFirstValidation)
@@ -238,11 +353,11 @@ func Test_worker_String(t *testing.T) {
{
name: "string it",
want: `
-╔═worker════════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║
-║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║
-║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║
-╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═worker═══════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║
+║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║
+║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index d858749a97..63041083ea 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -598,29 +598,29 @@ func Test_requestTransactionManager_String(t *testing.T) {
traceTransactionManagerTransactions: true,
},
want: `
-╔═requestTransactionManager════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║
-║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║
-║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║
-║║ ╚══════════════╝╚══════════╝ ║ ║
-║╚═════════════════════════════════════════╝ ║
-║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗ ║║b0 false ║║
-║║║b0 false║║b0 false ║ ║╚═════════╝║
-║║╚════════╝╚═════════╝ ║ ║
-║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║ ║
-║║║╔═id═════════════════╗╔═shutdown╗╔═interrupted╗╔═interrupter╗╔═hasEnded╗╔═lastReceived════════════════╗║║ ║
-║║║║0x0000000000000000 0║║b0 false ║║ b0 false ║║0 element(s)║║b0 false ║║0001-01-01 00:00:00 +0000 UTC║║║ ║
-║║║╚════════════════════╝╚═════════╝╚════════════╝╚════════════╝╚═════════╝╚═════════════════════════════╝║║ ║
-║║╚═══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
-║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
-║╚═════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
-║╔═traceTransactionManagerTransactions╗ ║
-║║ b1 true ║ ║
-║╚════════════════════════════════════╝ ║
-╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═requestTransactionManager═══════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═runningRequests/value/requestTransaction╗╔═numberOfConcurrentRequests╗╔═currentTransactionId╗ ║
+║║ ╔═transactionId╗╔═completed╗ ║║ 0x0000000000000003 3 ║║ 0x00000004 4 ║ ║
+║║ ║ 0x00000002 2 ║║ b0 false ║ ║╚═══════════════════════════╝╚═════════════════════╝ ║
+║║ ╚══════════════╝╚══════════╝ ║ ║
+║╚═════════════════════════════════════════╝ ║
+║╔═executor/executor══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
+║║╔═running╗╔═shutdown╗ ║║b0 false ║║
+║║║b0 false║║b0 false ║ ║╚═════════╝║
+║║╚════════╝╚═════════╝ ║ ║
+║║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║ ║
+║║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║ ║
+║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║
+║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║
+║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
+║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
+║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
+║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
+║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
+║╔═traceTransactionManagerTransactions╗ ║
+║║ b1 true ║ ║
+║╚════════════════════════════════════╝ ║
+╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
[plc4x] 05/05: fix(plc4go/spi): fix race issues in request transaction
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 0458529ee1acfda55ae7ffaf75d6c95045a06fea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 15:49:48 2023 +0200
fix(plc4go/spi): fix race issues in request transaction
---
plc4go/spi/transactions/RequestTransaction.go | 25 +++-
.../spi/transactions/RequestTransactionManager.go | 6 +-
.../transactions/RequestTransactionManager_test.go | 26 ++--
plc4go/spi/transactions/RequestTransaction_test.go | 154 +++++++++++----------
.../transactions/requestTransaction_plc4xgen.go | 7 +-
5 files changed, 121 insertions(+), 97 deletions(-)
diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
index 18f88c7293..760a0d6a4c 100644
--- a/plc4go/spi/transactions/RequestTransaction.go
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -22,12 +22,15 @@ package transactions
import (
"context"
"fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
"github.com/apache/plc4x/plc4go/spi/pool"
+
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
- "sync"
- "time"
)
// RequestTransaction represents a transaction
@@ -58,7 +61,7 @@ type requestTransaction struct {
/** The initial operation to perform to kick off the request */
operation pool.Runnable `ignore:"true"` // TODO: maybe we can treat this as a function some day if we are able to check the definition in gen
- completionFuture pool.CompletionFuture
+ completionFuture atomic.Pointer[pool.CompletionFuture]
stateChangeMutex sync.Mutex
completed bool
@@ -66,6 +69,18 @@ type requestTransaction struct {
transactionLog zerolog.Logger `ignore:"true"`
}
+func (t *requestTransaction) setCompletionFuture(completionFuture pool.CompletionFuture) {
+ t.completionFuture.Store(&completionFuture)
+}
+
+func (t *requestTransaction) getCompletionFuture() pool.CompletionFuture {
+ completionFutureLoaded := t.completionFuture.Load()
+ if completionFutureLoaded == nil {
+ return nil
+ }
+ return *completionFutureLoaded
+}
+
//
// Internal section
//
@@ -118,14 +133,14 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Awaiting completion")
timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
defer cancelFunc()
- for t.completionFuture == nil {
+ for t.getCompletionFuture() == nil {
time.Sleep(time.Millisecond * 10)
if err := timeout.Err(); err != nil {
log.Error().Msg("Timout after a long time. This means something is very of here")
return errors.Wrap(err, "Error waiting for completion future to be set")
}
}
- if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+ if err := t.getCompletionFuture().AwaitCompletion(ctx); err != nil {
t.transactionLog.Trace().Int32("transactionId", t.transactionId).Msg("Errored")
return err
}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 83551e0805..9fc80d6f4d 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -169,7 +169,7 @@ func (r *requestTransactionManager) processWorklog() {
r.log.Debug().Msgf("Handling next\n%v\n. (Adding to running requests (length: %d))", next, len(r.runningRequests))
r.runningRequests = append(r.runningRequests, next)
completionFuture := r.executor.Submit(context.Background(), next.transactionId, next.operation)
- next.completionFuture = completionFuture
+ next.setCompletionFuture(completionFuture)
r.workLog.Remove(front)
}
}
@@ -190,7 +190,7 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
}
if r.shutdown.Load() {
transaction.completed = true
- transaction.completionFuture = &completedFuture{errors.New("request transaction manager in shutdown")}
+ transaction.setCompletionFuture(&completedFuture{errors.New("request transaction manager in shutdown")})
}
return transaction
}
@@ -203,7 +203,7 @@ func (r *requestTransactionManager) getNumberOfActiveRequests() int {
func (r *requestTransactionManager) failRequest(transaction *requestTransaction, err error) error {
// Try to fail it!
- transaction.completionFuture.Cancel(true, err)
+ transaction.getCompletionFuture().Cancel(true, err)
// End it
return r.endRequest(transaction)
}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 63041083ea..b80299c202 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -280,10 +280,11 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
transaction: &requestTransaction{},
},
mockSetup: func(t *testing.T, fields *fields, args *args) {
- completionFuture := NewMockCompletionFuture(t)
- expect := completionFuture.EXPECT()
+ completionFutureMock := NewMockCompletionFuture(t)
+ expect := completionFutureMock.EXPECT()
expect.Cancel(true, nil).Return()
- args.transaction.completionFuture = completionFuture
+ var completionFuture pool.CompletionFuture = completionFutureMock
+ args.transaction.completionFuture.Store(&completionFuture)
},
wantErr: true,
},
@@ -374,14 +375,17 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
numberOfConcurrentRequests: 100,
workLog: func() list.List {
l := list.New()
- l.PushBack(&requestTransaction{
- transactionId: 1,
- completionFuture: NewMockCompletionFuture(t),
- })
- l.PushBack(&requestTransaction{
- transactionId: 2,
- completionFuture: NewMockCompletionFuture(t),
- })
+ var completionFuture pool.CompletionFuture = NewMockCompletionFuture(t)
+ r1 := &requestTransaction{
+ transactionId: 1,
+ }
+ r1.completionFuture.Store(&completionFuture)
+ l.PushBack(r1)
+ r2 := &requestTransaction{
+ transactionId: 2,
+ }
+ r2.completionFuture.Store(&completionFuture)
+ l.PushBack(r2)
return *l
}(),
executor: sharedExecutorInstance,
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go b/plc4go/spi/transactions/RequestTransaction_test.go
index 4288d703ba..00c68fe1f9 100644
--- a/plc4go/spi/transactions/RequestTransaction_test.go
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -35,11 +35,10 @@ import (
func Test_requestTransaction_EndRequest(t1 *testing.T) {
type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- completed bool
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completed bool
}
tests := []struct {
name string
@@ -65,12 +64,11 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: testutils.ProduceTestingLogger(t1),
- completed: tt.fields.completed,
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ transactionLog: testutils.ProduceTestingLogger(t1),
+ completed: tt.fields.completed,
}
if err := t.EndRequest(); (err != nil) != tt.wantErr {
t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -81,33 +79,34 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
func Test_requestTransaction_FailRequest(t1 *testing.T) {
type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- completed bool
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ transactionLog zerolog.Logger
+ completed bool
}
type args struct {
err error
}
tests := []struct {
- name string
- fields fields
- args args
- mockSetup func(t *testing.T, fields *fields, args *args)
- wantErr assert.ErrorAssertionFunc
+ name string
+ fields fields
+ args args
+ mockSetup func(t *testing.T, fields *fields, args *args)
+ manipulator func(t *testing.T, transaction *requestTransaction)
+ wantErr assert.ErrorAssertionFunc
}{
{
name: "just fail it",
fields: fields{
parent: &requestTransactionManager{},
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
- completionFuture := NewMockCompletionFuture(t)
- expect := completionFuture.EXPECT()
+ manipulator: func(t *testing.T, transaction *requestTransaction) {
+ completionFutureMock := NewMockCompletionFuture(t)
+ expect := completionFutureMock.EXPECT()
expect.Cancel(true, nil).Return()
- fields.completionFuture = completionFuture
+ var completionFuture pool.CompletionFuture = completionFutureMock
+ transaction.completionFuture.Store(&completionFuture)
},
wantErr: assert.Error,
},
@@ -129,32 +128,37 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
tt.mockSetup(t, &tt.fields, &tt.args)
}
r := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- completed: tt.fields.completed,
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ transactionLog: tt.fields.transactionLog,
+ completed: tt.fields.completed,
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t, r)
}
tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
})
}
}
-func Test_requestTransaction_String(t1 *testing.T) {
+func Test_requestTransaction_String(t *testing.T) {
type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
}
tests := []struct {
- name string
- fields fields
- want string
+ name string
+ fields fields
+ manipulator func(t *testing.T, transaction *requestTransaction)
+ want string
}{
{
name: "give a string",
+ manipulator: func(t *testing.T, transaction *requestTransaction) {
+ transaction.setCompletionFuture(nil)
+ },
want: `
╔═requestTransaction═════════╗
║╔═transactionId╗╔═completed╗║
@@ -164,15 +168,17 @@ func Test_requestTransaction_String(t1 *testing.T) {
},
}
for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: testutils.ProduceTestingLogger(t1),
+ t.Run(tt.name, func(t1 *testing.T) {
+ _t := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ transactionLog: testutils.ProduceTestingLogger(t1),
}
- if got := t.String(); got != tt.want {
+ if tt.manipulator != nil {
+ tt.manipulator(t, _t)
+ }
+ if got := _t.String(); got != tt.want {
t1.Errorf("String() = \n%v, want \n%v", got, tt.want)
}
})
@@ -181,12 +187,11 @@ func Test_requestTransaction_String(t1 *testing.T) {
func Test_requestTransaction_Submit(t1 *testing.T) {
type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- completed bool
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ transactionLog zerolog.Logger
+ completed bool
}
type args struct {
operation RequestTransactionRunnable
@@ -240,12 +245,11 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- completed: tt.fields.completed,
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ transactionLog: tt.fields.transactionLog,
+ completed: tt.fields.completed,
}
t.Submit(tt.args.operation)
t.operation()
@@ -255,10 +259,9 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
}
type args struct {
ctx context.Context
@@ -285,13 +288,12 @@ func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
return ctx
}(),
},
- mockSetup: func(t *testing.T, fields *fields, args *args) {
- completionFuture := NewMockCompletionFuture(t)
- expect := completionFuture.EXPECT()
- expect.AwaitCompletion(mock.Anything).Return(nil)
- fields.completionFuture = completionFuture
- },
manipulator: func(t *testing.T, transaction *requestTransaction) {
+ completionFutureMock := NewMockCompletionFuture(t)
+ expect := completionFutureMock.EXPECT()
+ expect.AwaitCompletion(mock.Anything).Return(nil)
+ var completionFuture pool.CompletionFuture = completionFutureMock
+ transaction.completionFuture.Store(&completionFuture)
go func() {
time.Sleep(100 * time.Millisecond)
r := transaction.parent
@@ -308,11 +310,13 @@ func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
tt.mockSetup(t1, &tt.fields, &tt.args)
}
t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: testutils.ProduceTestingLogger(t1),
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ transactionLog: testutils.ProduceTestingLogger(t1),
+ }
+ if tt.manipulator != nil {
+ tt.manipulator(t1, t)
}
if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/plc4go/spi/transactions/requestTransaction_plc4xgen.go b/plc4go/spi/transactions/requestTransaction_plc4xgen.go
index d6dd6afa75..733c59fde4 100644
--- a/plc4go/spi/transactions/requestTransaction_plc4xgen.go
+++ b/plc4go/spi/transactions/requestTransaction_plc4xgen.go
@@ -47,8 +47,9 @@ func (d *requestTransaction) SerializeWithWriteBuffer(ctx context.Context, write
return err
}
- if d.completionFuture != nil {
- if serializableField, ok := d.completionFuture.(utils.Serializable); ok {
+ if completionFutureLoaded := d.completionFuture.Load(); completionFutureLoaded != nil && *completionFutureLoaded != nil {
+ completionFuture := *completionFutureLoaded
+ if serializableField, ok := completionFuture.(utils.Serializable); ok {
if err := writeBuffer.PushContext("completionFuture"); err != nil {
return err
}
@@ -59,7 +60,7 @@ func (d *requestTransaction) SerializeWithWriteBuffer(ctx context.Context, write
return err
}
} else {
- stringValue := fmt.Sprintf("%v", d.completionFuture)
+ stringValue := fmt.Sprintf("%v", completionFuture)
if err := writeBuffer.WriteString("completionFuture", uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
return err
}