You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/03 08:41:08 UTC
[plc4x] 02/02: fix(plc4go/spi): make shutdown of WorkerPool more reliable
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 7d745dae3c663ab5c7b4a26a4a444c78fc8433ed
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:40:58 2023 +0200
fix(plc4go/spi): make shutdown of WorkerPool more reliable
---
plc4go/spi/pool/WorkerPool.go | 138 +++++++++++++++++++++++++++-------
plc4go/spi/pool/WorkerPool_test.go | 147 ++++++++++++++++++++++++++++++++++++-
2 files changed, 258 insertions(+), 27 deletions(-)
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c9c031efa1..bcf143bef8 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"io"
@@ -42,6 +43,7 @@ type worker struct {
executor interface {
isTraceWorkers() bool
getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
}
hasEnded atomic.Bool
lastReceived time.Time
@@ -58,6 +60,8 @@ func (w *worker) initialize() {
}
func (w *worker) work() {
+ w.executor.getWorkerWaitGroup().Add(1)
+ defer w.executor.getWorkerWaitGroup().Done()
defer func() {
if recovered := recover(); recovered != nil {
w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
@@ -104,8 +108,8 @@ type workItem struct {
completionFuture *future
}
-func (w *workItem) String() string {
- return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
+func (w workItem) String() string {
+ return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
}
type Executor interface {
@@ -117,13 +121,15 @@ type Executor interface {
}
type executor struct {
- maxNumberOfWorkers int
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*worker
- workItems chan workItem
- traceWorkers bool
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+
+ workerWaitGroup sync.WaitGroup
log zerolog.Logger
}
@@ -136,9 +142,19 @@ func (e *executor) getWorksItems() chan workItem {
return e.workItems
}
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+ return &e.workerWaitGroup
+}
+
type dynamicExecutor struct {
- executor
- maxNumberOfWorkers int
+ *executor
+
+ maxNumberOfWorkers int
+ currentNumberOfWorkers atomic.Int32
+ dynamicStateChange sync.Mutex
+ interrupter chan struct{}
+
+ dynamicWorkers sync.WaitGroup
}
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
@@ -151,10 +167,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
}
}
_executor := &executor{
- maxNumberOfWorkers: numberOfWorkers,
- workItems: make(chan workItem, queueDepth),
- worker: workers,
- log: customLogger,
+ queueDepth: queueDepth,
+ workItems: make(chan workItem, queueDepth),
+ worker: workers,
+ log: customLogger,
}
for _, option := range _options {
switch option := option.(type) {
@@ -175,11 +191,10 @@ var timeToBecomeUnused = 5 * time.Second
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLogger(_options...)
_executor := &dynamicExecutor{
- executor: executor{
- maxNumberOfWorkers: maxNumberOfWorkers,
- workItems: make(chan workItem, queueDepth),
- worker: make([]*worker, 0),
- log: customLogger,
+ executor: &executor{
+ workItems: make(chan workItem, queueDepth),
+ worker: make([]*worker, 0),
+ log: customLogger,
},
maxNumberOfWorkers: maxNumberOfWorkers,
}
@@ -240,6 +255,7 @@ func (e *executor) Start() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
return
}
e.running = true
@@ -252,10 +268,25 @@ func (e *executor) Start() {
}
func (e *dynamicExecutor) Start() {
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ if e.interrupter != nil {
+ e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+ close(e.interrupter)
+ e.dynamicWorkers.Wait()
+ }
+
e.executor.Start()
mutex := sync.Mutex{}
+ e.interrupter = make(chan struct{})
// Worker spawner
go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
defer func() {
if err := recover(); err != nil {
e.log.Error().Msgf("panic-ed %v", err)
@@ -266,13 +297,13 @@ func (e *dynamicExecutor) Start() {
workerLog = zerolog.Nop()
}
for e.running && !e.shutdown {
- workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
- time.Sleep(upScaleInterval)
+ workerLog.Trace().Msg("running")
mutex.Lock()
numberOfItemsInQueue := len(e.workItems)
numberOfWorkers := len(e.worker)
- workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+ workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+ workerLog.Trace().Msg("spawning new worker")
_worker := &worker{
id: numberOfWorkers - 1,
interrupter: make(chan struct{}, 1),
@@ -284,14 +315,28 @@ func (e *dynamicExecutor) Start() {
_worker.initialize()
workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
go _worker.work()
+ e.currentNumberOfWorkers.Add(1)
} else {
workerLog.Trace().Msg("Nothing to scale")
}
mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ timer := time.NewTimer(upScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
}
+ workerLog.Info().Msg("Terminated")
}()
// Worker killer
go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
defer func() {
if err := recover(); err != nil {
e.log.Error().Msgf("panic-ed %v", err)
@@ -302,8 +347,7 @@ func (e *dynamicExecutor) Start() {
workerLog = zerolog.Nop()
}
for e.running && !e.shutdown {
- workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
- time.Sleep(downScaleInterval)
+ workerLog.Trace().Msg("running")
mutex.Lock()
newWorkers := make([]*worker, 0)
for _, _worker := range e.worker {
@@ -313,6 +357,7 @@ func (e *dynamicExecutor) Start() {
workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
_worker.interrupted.Store(true)
close(_worker.interrupter)
+ e.currentNumberOfWorkers.Add(-1)
} else {
workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
newWorkers = append(newWorkers, _worker)
@@ -320,14 +365,27 @@ func (e *dynamicExecutor) Start() {
}
e.worker = newWorkers
mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ timer := time.NewTimer(downScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
}
+ workerLog.Info().Msg("Terminated")
}()
}
func (e *executor) Stop() {
+ e.log.Trace().Msg("stopping now")
e.stateChange.Lock()
defer e.stateChange.Unlock()
if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
return
}
e.shutdown = true
@@ -337,9 +395,27 @@ func (e *executor) Stop() {
worker.interrupted.Store(true)
close(worker.interrupter)
}
- close(e.workItems)
e.running = false
e.shutdown = false
+ e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+ e.workerWaitGroup.Wait()
+ e.log.Trace().Msg("stopped")
+}
+
+func (e *dynamicExecutor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ close(e.interrupter)
+ e.log.Trace().Msg("stopping inner executor")
+ e.executor.Stop()
+ e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+ e.dynamicWorkers.Wait()
+ e.log.Trace().Msg("stopped")
}
func (e *executor) Close() error {
@@ -395,3 +471,13 @@ func (f *future) AwaitCompletion(ctx context.Context) error {
}
return nil
}
+
+func (f *future) String() string {
+ return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
+ f.cancelRequested.Load(),
+ f.interruptRequested.Load(),
+ f.completed.Load(),
+ f.errored.Load(),
+ f.err.Load(),
+ )
+}
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index b084e61d56..9a62a09d25 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -420,7 +420,7 @@ func TestWorkItem_String(t *testing.T) {
}{
{
name: "Simple test",
- want: "Workitem{wid:0}",
+ want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
},
}
for _, tt := range tests {
@@ -743,6 +743,151 @@ func Test_future_complete(t *testing.T) {
}
}
+func Test_dynamicExecutor_Start(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startTwice bool
+ }{
+ {
+ name: "just start",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "start twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ startTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ }
+ e.Start()
+ if tt.startTwice {
+ e.Start()
+ }
+ // Let it work a bit
+ time.Sleep(20 * time.Millisecond)
+ t.Log("done with test")
+ t.Cleanup(e.Stop)
+ })
+ }
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ interrupter chan struct{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startIt bool
+ stopTwice bool
+ }{
+ {
+ name: "just stop",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ stopTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ interrupter: tt.fields.interrupter,
+ }
+ if tt.startIt {
+ e.Start()
+ }
+ e.Stop()
+ if tt.stopTwice {
+ e.Stop()
+ }
+ })
+ }
+}
+
// from: https://github.com/golang/go/issues/36532#issuecomment-575535452
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())