You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 10:22:40 UTC
[plc4x] branch develop updated: fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new d9584bcde7 fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
d9584bcde7 is described below
commit d9584bcde767b716a46f1554b2cf90f79335bab9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 12:22:32 2023 +0200
fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
---
plc4go/spi/pool/WorkerPool.go | 183 +++++++++++++++++++++----------------
plc4go/spi/pool/WorkerPool_test.go | 18 ++--
2 files changed, 112 insertions(+), 89 deletions(-)
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c1881f1297..c9c031efa1 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -35,11 +35,14 @@ import (
type Runnable func()
type worker struct {
- id int
- shutdown atomic.Bool
- interrupted atomic.Bool
- interrupter chan struct{}
- executor *executor
+ id int
+ shutdown atomic.Bool
+ interrupted atomic.Bool
+ interrupter chan struct{}
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ }
hasEnded atomic.Bool
lastReceived time.Time
@@ -65,7 +68,7 @@ func (w *worker) work() {
}
}()
workerLog := w.log.With().Int("Worker id", w.id).Logger()
- if !w.executor.traceWorkers {
+ if !w.executor.isTraceWorkers() {
workerLog = zerolog.Nop()
}
workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
@@ -75,7 +78,7 @@ func (w *worker) work() {
for !w.shutdown.Load() {
workerLog.Debug().Msg("Working")
select {
- case _workItem := <-w.executor.workItems:
+ case _workItem := <-w.executor.getWorksItems():
w.lastReceived = time.Now()
workerLog.Debug().Msgf("Got work item %v", _workItem)
if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
@@ -125,6 +128,19 @@ type executor struct {
log zerolog.Logger
}
+func (e *executor) isTraceWorkers() bool {
+ return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+ return e.workItems
+}
+
+type dynamicExecutor struct {
+ executor
+ maxNumberOfWorkers int
+}
+
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
workers := make([]*worker, numberOfWorkers)
customLogger := options.ExtractCustomLogger(_options...)
@@ -158,11 +174,14 @@ var timeToBecomeUnused = 5 * time.Second
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLogger(_options...)
- _executor := &executor{
+ _executor := &dynamicExecutor{
+ executor: executor{
+ maxNumberOfWorkers: maxNumberOfWorkers,
+ workItems: make(chan workItem, queueDepth),
+ worker: make([]*worker, 0),
+ log: customLogger,
+ },
maxNumberOfWorkers: maxNumberOfWorkers,
- workItems: make(chan workItem, queueDepth),
- worker: make([]*worker, 0),
- log: customLogger,
}
for _, option := range _options {
switch option := option.(type) {
@@ -178,75 +197,6 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
lastReceived: time.Now(),
log: customLogger,
})
- mutex := sync.Mutex{}
- // Worker spawner
- go func() {
- defer func() {
- if err := recover(); err != nil {
- customLogger.Error().Msgf("panic-ed %v", err)
- }
- }()
- workerLog := customLogger.With().Str("Worker type", "spawner").Logger()
- if !_executor.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for {
- workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
- time.Sleep(upScaleInterval)
- mutex.Lock()
- numberOfItemsInQueue := len(_executor.workItems)
- numberOfWorkers := len(_executor.worker)
- workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
- if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
- _worker := &worker{
- id: numberOfWorkers - 1,
- interrupter: make(chan struct{}, 1),
- executor: _executor,
- lastReceived: time.Now(),
- log: customLogger,
- }
- _executor.worker = append(_executor.worker, _worker)
- _worker.initialize()
- workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
- go _worker.work()
- } else {
- workerLog.Trace().Msg("Nothing to scale")
- }
- mutex.Unlock()
- }
- }()
- // Worker killer
- go func() {
- defer func() {
- if err := recover(); err != nil {
- _executor.log.Error().Msgf("panic-ed %v", err)
- }
- }()
- workerLog := customLogger.With().Str("Worker type", "killer").Logger()
- if !_executor.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for {
- workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
- time.Sleep(downScaleInterval)
- mutex.Lock()
- newWorkers := make([]*worker, 0)
- for _, _worker := range _executor.worker {
- deadline := time.Now().Add(-timeToBecomeUnused)
- workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
- if _worker.lastReceived.Before(deadline) {
- workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
- _worker.interrupted.Store(true)
- close(_worker.interrupter)
- } else {
- workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
- newWorkers = append(newWorkers, _worker)
- }
- }
- _executor.worker = newWorkers
- mutex.Unlock()
- }
- }()
return _executor
}
@@ -301,6 +251,79 @@ func (e *executor) Start() {
}
}
+func (e *dynamicExecutor) Start() {
+ e.executor.Start()
+ mutex := sync.Mutex{}
+ // Worker spawner
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ time.Sleep(upScaleInterval)
+ mutex.Lock()
+ numberOfItemsInQueue := len(e.workItems)
+ numberOfWorkers := len(e.worker)
+ workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+ if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+ _worker := &worker{
+ id: numberOfWorkers - 1,
+ interrupter: make(chan struct{}, 1),
+ executor: e,
+ lastReceived: time.Now(),
+ log: e.log,
+ }
+ e.worker = append(e.worker, _worker)
+ _worker.initialize()
+ workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+ go _worker.work()
+ } else {
+ workerLog.Trace().Msg("Nothing to scale")
+ }
+ mutex.Unlock()
+ }
+ }()
+ // Worker killer
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v", err)
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "killer").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ time.Sleep(downScaleInterval)
+ mutex.Lock()
+ newWorkers := make([]*worker, 0)
+ for _, _worker := range e.worker {
+ deadline := time.Now().Add(-timeToBecomeUnused)
+ workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+ if _worker.lastReceived.Before(deadline) {
+ workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+ _worker.interrupted.Store(true)
+ close(_worker.interrupter)
+ } else {
+ workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+ newWorkers = append(newWorkers, _worker)
+ }
+ }
+ e.worker = newWorkers
+ mutex.Unlock()
+ }
+ }()
+}
+
func (e *executor) Stop() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index d19448ec6e..fc64be079b 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -279,8 +279,8 @@ func TestNewDynamicExecutor(t *testing.T) {
name string
args args
setup func(*testing.T, *args)
- manipulator func(*testing.T, *executor)
- executorValidator func(*testing.T, *executor) bool
+ manipulator func(*testing.T, *dynamicExecutor)
+ executorValidator func(*testing.T, *dynamicExecutor) bool
}{
{
name: "new Executor",
@@ -292,7 +292,7 @@ func TestNewDynamicExecutor(t *testing.T) {
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
},
- executorValidator: func(t *testing.T, e *executor) bool {
+ executorValidator: func(t *testing.T, e *dynamicExecutor) bool {
assert.False(t, e.running)
assert.False(t, e.shutdown)
assert.Len(t, e.worker, 1)
@@ -310,7 +310,7 @@ func TestNewDynamicExecutor(t *testing.T) {
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
},
- manipulator: func(t *testing.T, e *executor) {
+ manipulator: func(t *testing.T, e *dynamicExecutor) {
{
oldUpScaleInterval := upScaleInterval
t.Cleanup(func() {
@@ -354,7 +354,7 @@ func TestNewDynamicExecutor(t *testing.T) {
}
}()
},
- executorValidator: func(t *testing.T, e *executor) bool {
+ executorValidator: func(t *testing.T, e *dynamicExecutor) bool {
time.Sleep(500 * time.Millisecond)
assert.False(t, e.running)
assert.False(t, e.shutdown)
@@ -367,12 +367,12 @@ func TestNewDynamicExecutor(t *testing.T) {
if tt.setup != nil {
tt.setup(t, &tt.args)
}
- fixedSizeExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
- defer fixedSizeExecutor.Stop()
+ dynamicSizedExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+ defer dynamicSizedExecutor.Stop()
if tt.manipulator != nil {
- tt.manipulator(t, fixedSizeExecutor.(*executor))
+ tt.manipulator(t, dynamicSizedExecutor.(*dynamicExecutor))
}
- assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
+ assert.True(t, tt.executorValidator(t, dynamicSizedExecutor.(*dynamicExecutor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
})
}
}