You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/19 16:57:05 UTC

[plc4x] 05/05: fix(plc4go/spi): fix concurrency issue when a executor is being started and stopped pretty fast

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5894b08ef7b7040de580e0011a5ef7e891597487
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jun 19 18:56:51 2023 +0200

    fix(plc4go/spi): fix concurrency issue when a executor is being started and stopped pretty fast
---
 plc4go/internal/cbus/CBusMessageMapper_test.go | 181 +++++++++++++++----------
 plc4go/spi/pool/executor.go                    |   2 +
 plc4go/spi/pool/worker.go                      |  15 +-
 plc4go/spi/pool/worker_test.go                 |  51 +++++--
 4 files changed, 167 insertions(+), 82 deletions(-)

diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 588ba63d81..0857832a47 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -21,17 +21,18 @@ package cbus
 
 import (
 	"fmt"
+	"testing"
+
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	apiValues "github.com/apache/plc4x/plc4go/pkg/api/values"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
-	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	spiValues "github.com/apache/plc4x/plc4go/spi/values"
+
 	"github.com/stretchr/testify/assert"
-	"testing"
 )
 
 func TestTagToCBusMessage(t *testing.T) {
@@ -1468,12 +1469,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1540,12 +1543,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1614,12 +1619,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1677,12 +1684,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1730,12 +1739,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1786,12 +1797,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1862,12 +1875,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -1956,12 +1971,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2013,12 +2030,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2066,12 +2085,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2119,12 +2140,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2172,12 +2195,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2225,12 +2250,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2298,12 +2325,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2351,12 +2380,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2404,12 +2435,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2457,12 +2490,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2510,12 +2545,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2563,12 +2600,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2632,12 +2671,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2685,12 +2726,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
@@ -2738,12 +2781,14 @@ func TestMapEncodedReply(t *testing.T) {
 				},
 			},
 			setup: func(t *testing.T, args *args) {
-				executor := pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				executor := pool.NewFixedSizeExecutor(10, 50, _options...)
 				executor.Start()
 				transactionManager := transactions.NewRequestTransactionManager(
 					1,
-					options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
-					transactions.WithCustomExecutor(executor),
+					append(_options,
+						transactions.WithCustomExecutor(executor),
+					)...,
 				)
 				t.Cleanup(func() {
 					assert.NoError(t, transactionManager.Close())
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 794bfb4221..403408f9ca 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -92,11 +92,13 @@ func (e *executor) Start() {
 	}
 	e.running = true
 	e.shutdown = false
+	e.log.Debug().Msgf("Starting %d workers", len(e.worker))
 	for i := 0; i < len(e.worker); i++ {
 		_worker := e.worker[i]
 		_worker.initialize()
 		_worker.start()
 	}
+	e.log.Trace().Msg("started")
 }
 
 func (e *executor) Stop() {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 6017f6c78c..f58e8dcbde 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -50,23 +50,27 @@ type worker struct {
 }
 
 func (w *worker) initialize() {
+	w.stateChange.Lock()
+	defer w.stateChange.Unlock()
+	w.lastReceived.Store(time.Now())
+	w.running.Store(false)
 	w.shutdown.Store(false)
 	w.interrupted.Store(false)
 	w.interrupter = make(chan struct{}, 1)
-	w.lastReceived.Store(time.Now())
 }
 
 func (w *worker) start() {
 	w.stateChange.Lock()
 	defer w.stateChange.Unlock()
 	if w.running.Load() {
-		log.Warn().Msg("Worker already started")
+		log.Warn().Int("Worker id", w.id).Msg("Worker already started")
 		return
 	}
 	if w.executor.isTraceWorkers() {
 		w.log.Debug().Msgf("Starting worker\n%s", w)
 	}
 	w.executor.getWorkerWaitGroup().Add(1)
+	w.running.Store(true)
 	go w.work()
 }
 
@@ -74,9 +78,13 @@ func (w *worker) stop(interrupt bool) {
 	w.stateChange.Lock()
 	defer w.stateChange.Unlock()
 	if !w.running.Load() {
-		w.log.Warn().Msg("Worker not running")
+		w.log.Warn().Int("Worker id", w.id).Msg("Worker not running")
 		return
 	}
+
+	if w.executor.isTraceWorkers() {
+		w.log.Debug().Msgf("Stopping worker\n%s", w)
+	}
 	w.shutdown.Store(true)
 	if interrupt {
 		w.interrupted.Store(true)
@@ -95,7 +103,6 @@ func (w *worker) work() {
 			}
 		}
 	}()
-	w.running.Store(true)
 	defer w.running.Store(false)
 	workerLog := w.log.With().Int("Worker id", w.id).Logger()
 	if !w.executor.isTraceWorkers() {
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index d781466b43..7e24fd4077 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -82,24 +82,38 @@ func Test_worker_start(t *testing.T) {
 			fields: fields{
 				executor: func() *executor {
 					e := &executor{
-						workItems:    make(chan workItem),
+						workItems:    make(chan workItem, 1),
 						traceWorkers: true,
 					}
-					go func() {
-						e.workItems <- workItem{
-							workItemId: 0,
-							runnable: func() {
-								// No-op
-							},
-							completionFuture: &future{},
-						}
-					}()
+					e.workItems <- workItem{
+						workItemId: 0,
+						runnable: func() {
+							// No-op
+						},
+						completionFuture: &future{},
+					}
 					return e
 				}(),
 			},
 		},
 		{
 			name: "start started",
+			fields: fields{
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem, 1),
+						traceWorkers: true,
+					}
+					e.workItems <- workItem{
+						workItemId: 0,
+						runnable: func() {
+							// No-op
+						},
+						completionFuture: &future{},
+					}
+					return e
+				}(),
+			},
 			manipulator: func(t *testing.T, worker *worker) {
 				worker.running.Store(true)
 			},
@@ -149,6 +163,22 @@ func Test_worker_stop(t *testing.T) {
 			name: "stop started",
 			fields: fields{
 				interrupter: make(chan struct{}),
+				executor: func() *executor {
+					e := &executor{
+						workItems:    make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						e.workItems <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								// No-op
+							},
+							completionFuture: &future{},
+						}
+					}()
+					return e
+				}(),
 			},
 			manipulator: func(t *testing.T, worker *worker) {
 				worker.running.Store(true)
@@ -318,6 +348,7 @@ func Test_worker_work(t *testing.T) {
 				log:         produceTestingLogger(t),
 			}
 			w.executor.getWorkerWaitGroup().Add(1)
+			w.running.Store(true)
 			go w.work()
 			if tt.firstValidation != nil {
 				time.Sleep(tt.timeBeforeFirstValidation)