You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 16:57:05 UTC
[plc4x] 05/05: fix(plc4go/spi): fix concurrency issue when a executor is being started and stopped pretty fast
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 5894b08ef7b7040de580e0011a5ef7e891597487
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 18:56:51 2023 +0200
fix(plc4go/spi): fix concurrency issue when a executor is being started and stopped pretty fast
---
plc4go/internal/cbus/CBusMessageMapper_test.go | 181 +++++++++++++++----------
plc4go/spi/pool/executor.go | 2 +
plc4go/spi/pool/worker.go | 15 +-
plc4go/spi/pool/worker_test.go | 51 +++++--
4 files changed, 167 insertions(+), 82 deletions(-)
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 588ba63d81..0857832a47 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -21,17 +21,18 @@ package cbus
import (
"fmt"
+ "testing"
+
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
"github.com/stretchr/testify/assert"
- "testing"
)
func TestTagToCBusMessage(t *testing.T) {
@@ -1468,12 +1469,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1540,12 +1543,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1614,12 +1619,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1677,12 +1684,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1730,12 +1739,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1786,12 +1797,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1862,12 +1875,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -1956,12 +1971,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2013,12 +2030,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2066,12 +2085,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2119,12 +2140,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2172,12 +2195,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2225,12 +2250,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2298,12 +2325,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2351,12 +2380,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2404,12 +2435,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2457,12 +2490,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2510,12 +2545,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2563,12 +2600,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2632,12 +2671,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2685,12 +2726,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
@@ -2738,12 +2781,14 @@ func TestMapEncodedReply(t *testing.T) {
},
},
setup: func(t *testing.T, args *args) {
- executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+ executor := pool.NewFixedSizeExecutor(10, 50, _options...)
executor.Start()
transactionManager := transactions.NewRequestTransactionManager(
1,
- options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
- transactions.WithCustomExecutor(executor),
+ append(_options,
+ transactions.WithCustomExecutor(executor),
+ )...,
)
t.Cleanup(func() {
assert.NoError(t, transactionManager.Close())
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 794bfb4221..403408f9ca 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -92,11 +92,13 @@ func (e *executor) Start() {
}
e.running = true
e.shutdown = false
+ e.log.Debug().Msgf("Starting %d workers", len(e.worker))
for i := 0; i < len(e.worker); i++ {
_worker := e.worker[i]
_worker.initialize()
_worker.start()
}
+ e.log.Trace().Msg("started")
}
func (e *executor) Stop() {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 6017f6c78c..f58e8dcbde 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -50,23 +50,27 @@ type worker struct {
}
func (w *worker) initialize() {
+ w.stateChange.Lock()
+ defer w.stateChange.Unlock()
+ w.lastReceived.Store(time.Now())
+ w.running.Store(false)
w.shutdown.Store(false)
w.interrupted.Store(false)
w.interrupter = make(chan struct{}, 1)
- w.lastReceived.Store(time.Now())
}
func (w *worker) start() {
w.stateChange.Lock()
defer w.stateChange.Unlock()
if w.running.Load() {
- log.Warn().Msg("Worker already started")
+ log.Warn().Int("Worker id", w.id).Msg("Worker already started")
return
}
if w.executor.isTraceWorkers() {
w.log.Debug().Msgf("Starting worker\n%s", w)
}
w.executor.getWorkerWaitGroup().Add(1)
+ w.running.Store(true)
go w.work()
}
@@ -74,9 +78,13 @@ func (w *worker) stop(interrupt bool) {
w.stateChange.Lock()
defer w.stateChange.Unlock()
if !w.running.Load() {
- w.log.Warn().Msg("Worker not running")
+ w.log.Warn().Int("Worker id", w.id).Msg("Worker not running")
return
}
+
+ if w.executor.isTraceWorkers() {
+ w.log.Debug().Msgf("Stopping worker\n%s", w)
+ }
w.shutdown.Store(true)
if interrupt {
w.interrupted.Store(true)
@@ -95,7 +103,6 @@ func (w *worker) work() {
}
}
}()
- w.running.Store(true)
defer w.running.Store(false)
workerLog := w.log.With().Int("Worker id", w.id).Logger()
if !w.executor.isTraceWorkers() {
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index d781466b43..7e24fd4077 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -82,24 +82,38 @@ func Test_worker_start(t *testing.T) {
fields: fields{
executor: func() *executor {
e := &executor{
- workItems: make(chan workItem),
+ workItems: make(chan workItem, 1),
traceWorkers: true,
}
- go func() {
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- // No-op
- },
- completionFuture: &future{},
- }
- }()
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ // No-op
+ },
+ completionFuture: &future{},
+ }
return e
}(),
},
},
{
name: "start started",
+ fields: fields{
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem, 1),
+ traceWorkers: true,
+ }
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ // No-op
+ },
+ completionFuture: &future{},
+ }
+ return e
+ }(),
+ },
manipulator: func(t *testing.T, worker *worker) {
worker.running.Store(true)
},
@@ -149,6 +163,22 @@ func Test_worker_stop(t *testing.T) {
name: "stop started",
fields: fields{
interrupter: make(chan struct{}),
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ // No-op
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
},
manipulator: func(t *testing.T, worker *worker) {
worker.running.Store(true)
@@ -318,6 +348,7 @@ func Test_worker_work(t *testing.T) {
log: produceTestingLogger(t),
}
w.executor.getWorkerWaitGroup().Add(1)
+ w.running.Store(true)
go w.work()
if tt.firstValidation != nil {
time.Sleep(tt.timeBeforeFirstValidation)