You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/14 18:30:44 UTC
[plc4x] branch develop updated: fix(plc4go/spi): fix data race in executor
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 860a15b0e2 fix(plc4go/spi): fix data race in executor
860a15b0e2 is described below
commit 860a15b0e2c7f9d676aa589363e55e2024c40dd9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 14 20:30:37 2023 +0200
fix(plc4go/spi): fix data race in executor
---
plc4go/spi/pool/WorkerPool.go | 19 +++++++++++--------
plc4go/spi/pool/dynamicExecutor.go | 18 +++++++++---------
plc4go/spi/pool/dynamicExecutor_test.go | 9 ++++++++-
plc4go/spi/pool/executor.go | 9 ++++++---
plc4go/spi/pool/executor_test.go | 14 +++++++++-----
plc4go/spi/pool/worker.go | 6 +++---
plc4go/spi/pool/worker_plc4xgen.go | 19 +++++++++++++++++--
plc4go/spi/pool/worker_test.go | 24 ++++++++++++------------
8 files changed, 75 insertions(+), 43 deletions(-)
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 52a0c4df2b..82d0e7a4a1 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -45,10 +45,12 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
workers := make([]*worker, numberOfWorkers)
customLogger := options.ExtractCustomLogger(_options...)
for i := 0; i < numberOfWorkers; i++ {
- workers[i] = &worker{
+ w := worker{
id: i,
log: customLogger,
}
+ w.lastReceived.Store(time.Time{})
+ workers[i] = &w
}
_executor := &executor{
queueDepth: queueDepth,
@@ -85,13 +87,14 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
}
}
// We spawn one initial worker
- _executor.worker = append(_executor.worker, &worker{
- id: 0,
- interrupter: make(chan struct{}, 1),
- executor: _executor,
- lastReceived: time.Now(),
- log: customLogger,
- })
+ w := worker{
+ id: 0,
+ interrupter: make(chan struct{}, 1),
+ executor: _executor,
+ log: customLogger,
+ }
+ w.lastReceived.Store(time.Now())
+ _executor.worker = append(_executor.worker, &w)
return _executor
}
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 3af93a9244..12a39665ca 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -73,7 +73,7 @@ func (e *dynamicExecutor) Start() {
if !e.traceWorkers {
workerLog = zerolog.Nop()
}
- for e.running && !e.shutdown {
+ for e.IsRunning() {
workerLog.Trace().Msg("running")
mutex.Lock()
numberOfItemsInQueue := len(e.workItems)
@@ -82,12 +82,12 @@ func (e *dynamicExecutor) Start() {
if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
workerLog.Trace().Msg("spawning new worker")
_worker := &worker{
- id: numberOfWorkers - 1,
- interrupter: make(chan struct{}, 1),
- executor: e,
- lastReceived: time.Now(),
- log: e.log,
+ id: numberOfWorkers - 1,
+ interrupter: make(chan struct{}, 1),
+ executor: e,
+ log: e.log,
}
+ _worker.lastReceived.Store(time.Now())
e.worker = append(e.worker, _worker)
_worker.initialize()
workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
@@ -123,14 +123,14 @@ func (e *dynamicExecutor) Start() {
if !e.traceWorkers {
workerLog = zerolog.Nop()
}
- for e.running && !e.shutdown {
+ for e.IsRunning() {
workerLog.Trace().Msg("running")
mutex.Lock()
newWorkers := make([]*worker, 0)
for _, _worker := range e.worker {
deadline := time.Now().Add(-timeToBecomeUnused)
- workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
- if _worker.lastReceived.Before(deadline) {
+ workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
+ if _worker.lastReceived.Load().(time.Time).Before(deadline) {
workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
_worker.interrupted.Store(true)
close(_worker.interrupter)
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 531867a71f..f1c8e4b4b2 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -21,6 +21,7 @@ package pool
import (
"github.com/stretchr/testify/assert"
+ "sync/atomic"
"testing"
"time"
)
@@ -185,7 +186,13 @@ func Test_dynamicExecutor_String(t *testing.T) {
fields: fields{
executor: &executor{
worker: []*worker{
- {},
+ {
+ lastReceived: func() atomic.Value {
+ value := atomic.Value{}
+ value.Store(time.Time{})
+ return value
+ }(),
+ },
},
},
maxNumberOfWorkers: 3,
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 104c7feae6..654edf2973 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -30,14 +30,15 @@ import (
//go:generate go run ../../tools/plc4xgenerator/gen.go -type=executor
type executor struct {
- running bool
- shutdown bool
- stateChange sync.Mutex
+ running bool
+ shutdown bool
+
worker []*worker
queueDepth int
workItems chan workItem
traceWorkers bool
+ stateChange sync.RWMutex
workerWaitGroup sync.WaitGroup
log zerolog.Logger `ignore:"true"`
@@ -126,5 +127,7 @@ func (e *executor) Close() error {
}
func (e *executor) IsRunning() bool {
+ e.stateChange.RLock()
+ defer e.stateChange.RUnlock()
return e.running && !e.shutdown
}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 229a7d9797..8bd13d6b0a 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -435,11 +435,15 @@ func Test_executor_String(t *testing.T) {
shutdown: true,
worker: []*worker{
{
- id: 1,
- shutdown: atomic.Bool{},
- interrupted: atomic.Bool{},
- hasEnded: atomic.Bool{},
- lastReceived: time.Time{},
+ id: 1,
+ shutdown: atomic.Bool{},
+ interrupted: atomic.Bool{},
+ hasEnded: atomic.Bool{},
+ lastReceived: func() atomic.Value {
+ value := atomic.Value{}
+ value.Store(time.Time{})
+ return value
+ }(),
},
},
queueDepth: 2,
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 15313dd6ee..0fa5ae597b 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -40,7 +40,7 @@ type worker struct {
getWorkerWaitGroup() *sync.WaitGroup
}
hasEnded atomic.Bool
- lastReceived time.Time `stringer:"true"`
+ lastReceived atomic.Value
log zerolog.Logger `ignore:"true"`
}
@@ -50,7 +50,7 @@ func (w *worker) initialize() {
w.interrupted.Store(false)
w.interrupter = make(chan struct{}, 1)
w.hasEnded.Store(false)
- w.lastReceived = time.Now()
+ w.lastReceived.Store(time.Now())
}
func (w *worker) work() {
@@ -77,7 +77,7 @@ func (w *worker) work() {
workerLog.Debug().Msg("Working")
select {
case _workItem := <-w.executor.getWorksItems():
- w.lastReceived = time.Now()
+ w.lastReceived.Store(time.Now())
workerLog.Debug().Msgf("Got work item %v", _workItem)
if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
workerLog.Debug().Msg("We need to stop")
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 85a6f1767c..23e8d1689c 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -64,8 +64,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
return err
}
- if err := writeBuffer.WriteString("lastReceived", uint32(len(d.lastReceived.String())*8), "UTF-8", d.lastReceived.String()); err != nil {
- return err
+ if d.lastReceived.Load() != nil {
+ if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
+ if err := writeBuffer.PushContext("lastReceived"); err != nil {
+ return err
+ }
+ if err := serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+ return err
+ }
+ if err := writeBuffer.PopContext("lastReceived"); err != nil {
+ return err
+ }
+ } else {
+ stringValue := fmt.Sprintf("%v", d.lastReceived.Load())
+ if err := writeBuffer.WriteString("lastReceived", uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
+ return err
+ }
+ }
}
if err := writeBuffer.PopContext("worker"); err != nil {
return err
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 446885028b..b7729f3e3d 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -24,7 +24,6 @@ import (
"testing"
"time"
- "github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
)
@@ -37,8 +36,6 @@ func Test_worker_initialize(t *testing.T) {
getWorksItems() chan workItem
getWorkerWaitGroup() *sync.WaitGroup
}
- lastReceived time.Time
- log zerolog.Logger
}
tests := []struct {
name string
@@ -51,11 +48,10 @@ func Test_worker_initialize(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &worker{
- id: tt.fields.id,
- interrupter: tt.fields.interrupter,
- executor: tt.fields.executor,
- lastReceived: tt.fields.lastReceived,
- log: tt.fields.log,
+ id: tt.fields.id,
+ interrupter: tt.fields.interrupter,
+ executor: tt.fields.executor,
+ log: produceTestLogger(t),
}
w.initialize()
})
@@ -205,6 +201,7 @@ func Test_worker_work(t *testing.T) {
id: tt.fields.id,
interrupter: make(chan struct{}, 1),
executor: tt.fields.executor,
+ log: produceTestLogger(t),
}
go w.work()
if tt.firstValidation != nil {
@@ -222,14 +219,16 @@ func Test_worker_work(t *testing.T) {
t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
tt.secondValidation(t, w)
}
+
+ close(w.interrupter)
+ time.Sleep(50 * time.Millisecond) // TODO: replace with worker stop... (which in turn essures the worker is not running anymore)
})
}
}
func Test_worker_String(t *testing.T) {
type fields struct {
- id int
- lastReceived time.Time
+ id int
}
tests := []struct {
name string
@@ -249,9 +248,10 @@ func Test_worker_String(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &worker{
- id: tt.fields.id,
- lastReceived: tt.fields.lastReceived,
+ id: tt.fields.id,
+ log: produceTestLogger(t),
}
+ w.lastReceived.Store(time.Time{})
assert.Equalf(t, tt.want, w.String(), "String()")
})
}