You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/13 12:17:23 UTC
[plc4x] 02/04: refactor(plc4go/spi): split up pool into multiple files
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit ade510700aab3ef7200064501bd2f01152919aa9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:44:07 2023 +0200
refactor(plc4go/spi): split up pool into multiple files
---
plc4go/spi/pool/CompletionFuture.go | 90 ++++
plc4go/spi/pool/CompletionFuture_test.go | 191 +++++++++
plc4go/spi/pool/WorkerPool.go | 393 +-----------------
plc4go/spi/pool/WorkerPool_test.go | 689 -------------------------------
plc4go/spi/pool/dynamicExecutor.go | 173 ++++++++
plc4go/spi/pool/dynamicExecutor_test.go | 170 ++++++++
plc4go/spi/pool/executor.go | 129 ++++++
plc4go/spi/pool/executor_test.go | 413 ++++++++++++++++++
plc4go/spi/pool/workItem.go | 32 ++
plc4go/spi/pool/workItem_test.go | 53 +++
plc4go/spi/pool/worker.go | 96 +++++
plc4go/spi/pool/worker_test.go | 227 ++++++++++
12 files changed, 1585 insertions(+), 1071 deletions(-)
diff --git a/plc4go/spi/pool/CompletionFuture.go b/plc4go/spi/pool/CompletionFuture.go
new file mode 100644
index 0000000000..5d11002356
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+)
+
+type CompletionFuture interface {
+ AwaitCompletion(ctx context.Context) error
+ Cancel(interrupt bool, err error)
+}
+
+type future struct {
+ cancelRequested atomic.Bool
+ interruptRequested atomic.Bool
+ completed atomic.Bool
+ errored atomic.Bool
+ err atomic.Value
+}
+
+func (f *future) Cancel(interrupt bool, err error) {
+ f.cancelRequested.Store(true)
+ f.interruptRequested.Store(interrupt)
+ if err != nil {
+ f.errored.Store(true)
+ f.err.Store(err)
+ }
+}
+
+func (f *future) complete() {
+ f.completed.Store(true)
+}
+
+// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
+var Canceled = errors.New("Canceled")
+
+func (f *future) AwaitCompletion(ctx context.Context) error {
+ for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
+ time.Sleep(time.Millisecond * 10)
+ }
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if err, ok := f.err.Load().(error); ok {
+ return err
+ }
+ if f.cancelRequested.Load() {
+ return Canceled
+ }
+ return nil
+}
+
+func (f *future) String() string {
+ return fmt.Sprintf("future{\n"+
+ "\tcancelRequested: %t,\n"+
+ "\tinterruptRequested: %t,\n"+
+ "\tcompleted: %t,\n"+
+ "\terrored: %t,\n"+
+ "\terr: %v,\n"+
+ "}",
+ f.cancelRequested.Load(),
+ f.interruptRequested.Load(),
+ f.completed.Load(),
+ f.errored.Load(),
+ f.err.Load(),
+ )
+}
diff --git a/plc4go/spi/pool/CompletionFuture_test.go b/plc4go/spi/pool/CompletionFuture_test.go
new file mode 100644
index 0000000000..79fdaeace7
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture_test.go
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func Test_future_AwaitCompletion(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ }
+ tests := []struct {
+ name string
+ args args
+ completer func(*future)
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "completes with error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ f.Cancel(false, errors.New("Uh oh"))
+ },
+ wantErr: assert.Error,
+ },
+ {
+ name: "completes regular",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 30)
+ f.complete()
+ },
+ wantErr: assert.NoError,
+ },
+ {
+ name: "completes not int time",
+ args: args{ctx: func() context.Context {
+ deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
+ t.Cleanup(cancel)
+ return deadline
+ }()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ },
+ wantErr: assert.Error,
+ },
+ {
+ name: "completes canceled without error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ f.Cancel(true, nil)
+ },
+ wantErr: func(t assert.TestingT, err error, i ...any) bool {
+ assert.Same(t, Canceled, err)
+ return true
+ },
+ },
+ {
+ name: "completes canceled with particular error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ f.Cancel(true, errors.New("Uh oh"))
+ },
+ wantErr: func(t assert.TestingT, err error, i ...any) bool {
+ assert.Equal(t, "Uh oh", err.Error())
+ return true
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ go tt.completer(f)
+ tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
+ })
+ }
+}
+
+func Test_future_Cancel(t *testing.T) {
+ type args struct {
+ interrupt bool
+ err error
+ }
+ tests := []struct {
+ name string
+ args args
+ verifier func(*testing.T, *future)
+ }{
+ {
+ name: "cancel cancels",
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ },
+ },
+ {
+ name: "cancel with interrupt",
+ args: args{
+ interrupt: true,
+ err: nil,
+ },
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ assert.False(t, f.errored.Load())
+ assert.Nil(t, f.err.Load())
+ },
+ },
+ {
+ name: "cancel with err",
+ args: args{
+ interrupt: true,
+ err: errors.New("Uh Oh"),
+ },
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ assert.True(t, f.errored.Load())
+ assert.NotNil(t, f.err.Load())
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ f.Cancel(tt.args.interrupt, tt.args.err)
+ tt.verifier(t, f)
+ })
+ }
+}
+
+func Test_future_String(t *testing.T) {
+ tests := []struct {
+ name string
+ want string
+ }{
+ {
+ name: "string it",
+ want: "future{\n\tcancelRequested: false,\n\tinterruptRequested: false,\n\tcompleted: false,\n\terrored: false,\n\terr: <nil>,\n}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ assert.Equalf(t, tt.want, f.String(), "String()")
+ })
+ }
+}
+
+func Test_future_complete(t *testing.T) {
+ tests := []struct {
+ name string
+ verifier func(*testing.T, *future)
+ }{
+ {
+ name: "complete completes",
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.completed.Load())
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ f.complete()
+ tt.verifier(t, f)
+ })
+ }
+}
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 85f3ef8064..d83e770dc9 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -21,97 +21,13 @@ package pool
import (
"context"
- "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/apache/plc4x/plc4go/spi/utils"
- "github.com/pkg/errors"
- "github.com/rs/zerolog"
"io"
- "runtime/debug"
- "sync"
- "sync/atomic"
"time"
)
type Runnable func()
-type worker struct {
- id int
- shutdown atomic.Bool
- interrupted atomic.Bool
- interrupter chan struct{}
- executor interface {
- isTraceWorkers() bool
- getWorksItems() chan workItem
- getWorkerWaitGroup() *sync.WaitGroup
- }
- hasEnded atomic.Bool
- lastReceived time.Time
-
- log zerolog.Logger
-}
-
-func (w *worker) initialize() {
- w.shutdown.Store(false)
- w.interrupted.Store(false)
- w.interrupter = make(chan struct{}, 1)
- w.hasEnded.Store(false)
- w.lastReceived = time.Now()
-}
-
-func (w *worker) work() {
- w.executor.getWorkerWaitGroup().Add(1)
- defer w.executor.getWorkerWaitGroup().Done()
- defer func() {
- if err := recover(); err != nil {
- w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- if !w.shutdown.Load() {
- // if we are not in shutdown we continue
- w.work()
- }
- }()
- workerLog := w.log.With().Int("Worker id", w.id).Logger()
- if !w.executor.isTraceWorkers() {
- workerLog = zerolog.Nop()
- }
- workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
- w.hasEnded.Store(false)
- workerLog.Debug().Msgf("setting to not ended")
-
- for !w.shutdown.Load() {
- workerLog.Debug().Msg("Working")
- select {
- case _workItem := <-w.executor.getWorksItems():
- w.lastReceived = time.Now()
- workerLog.Debug().Msgf("Got work item %v", _workItem)
- if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
- workerLog.Debug().Msg("We need to stop")
- // TODO: do we need to complete with a error?
- } else {
- workerLog.Debug().Msgf("Running work item %v", _workItem)
- _workItem.runnable()
- _workItem.completionFuture.complete()
- workerLog.Debug().Msgf("work item %v completed", _workItem)
- }
- case <-w.interrupter:
- workerLog.Debug().Msg("We got interrupted")
- }
- }
- w.hasEnded.Store(true)
- workerLog.Debug().Msg("setting to ended")
-}
-
-type workItem struct {
- workItemId int32
- runnable Runnable
- completionFuture *future
-}
-
-func (w workItem) String() string {
- return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
-}
-
type Executor interface {
io.Closer
Start()
@@ -120,43 +36,6 @@ type Executor interface {
IsRunning() bool
}
-type executor struct {
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*worker
- queueDepth int
- workItems chan workItem
- traceWorkers bool
-
- workerWaitGroup sync.WaitGroup
-
- log zerolog.Logger
-}
-
-func (e *executor) isTraceWorkers() bool {
- return e.traceWorkers
-}
-
-func (e *executor) getWorksItems() chan workItem {
- return e.workItems
-}
-
-func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
- return &e.workerWaitGroup
-}
-
-type dynamicExecutor struct {
- *executor
-
- maxNumberOfWorkers int
- currentNumberOfWorkers atomic.Int32
- dynamicStateChange sync.Mutex
- interrupter chan struct{}
-
- dynamicWorkers sync.WaitGroup
-}
-
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
workers := make([]*worker, numberOfWorkers)
customLogger := options.ExtractCustomLogger(_options...)
@@ -184,10 +63,6 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
return _executor
}
-var upScaleInterval = 100 * time.Millisecond
-var downScaleInterval = 5 * time.Second
-var timeToBecomeUnused = 5 * time.Second
-
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLogger(_options...)
_executor := &dynamicExecutor{
@@ -219,265 +94,19 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
return &tracerWorkersOption{traceWorkers: traceWorkers}
}
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
type tracerWorkersOption struct {
options.Option
traceWorkers bool
}
-func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
- if runnable == nil {
- value := atomic.Value{}
- value.Store(errors.New("runnable must not be nil"))
- return &future{err: value}
- }
- e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
- completionFuture := &future{}
- if e.shutdown {
- completionFuture.Cancel(false, errors.New("executor in shutdown"))
- return completionFuture
- }
- select {
- case e.workItems <- workItem{
- workItemId: workItemId,
- runnable: runnable,
- completionFuture: completionFuture,
- }:
- e.log.Trace().Msg("Item added")
- case <-ctx.Done():
- completionFuture.Cancel(false, ctx.Err())
- }
-
- e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
- return completionFuture
-}
-
-func (e *executor) Start() {
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if e.running || e.shutdown {
- e.log.Warn().Msg("Already started")
- return
- }
- e.running = true
- e.shutdown = false
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.initialize()
- go worker.work()
- }
-}
-
-func (e *dynamicExecutor) Start() {
- e.dynamicStateChange.Lock()
- defer e.dynamicStateChange.Unlock()
- if e.running || e.shutdown {
- e.log.Warn().Msg("Already started")
- return
- }
- if e.interrupter != nil {
- e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
- close(e.interrupter)
- e.dynamicWorkers.Wait()
- }
-
- e.executor.Start()
- mutex := sync.Mutex{}
- e.interrupter = make(chan struct{})
- // Worker spawner
- go func() {
- e.dynamicWorkers.Add(1)
- defer e.dynamicWorkers.Done()
- defer func() {
- if err := recover(); err != nil {
- e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- }()
- workerLog := e.log.With().Str("Worker type", "spawner").Logger()
- if !e.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for e.running && !e.shutdown {
- workerLog.Trace().Msg("running")
- mutex.Lock()
- numberOfItemsInQueue := len(e.workItems)
- numberOfWorkers := len(e.worker)
- workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
- if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
- workerLog.Trace().Msg("spawning new worker")
- _worker := &worker{
- id: numberOfWorkers - 1,
- interrupter: make(chan struct{}, 1),
- executor: e,
- lastReceived: time.Now(),
- log: e.log,
- }
- e.worker = append(e.worker, _worker)
- _worker.initialize()
- workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
- go _worker.work()
- e.currentNumberOfWorkers.Add(1)
- } else {
- workerLog.Trace().Msg("Nothing to scale")
- }
- mutex.Unlock()
- func() {
- workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
- timer := time.NewTimer(upScaleInterval)
- defer utils.CleanupTimer(timer)
- select {
- case <-timer.C:
- case <-e.interrupter:
- workerLog.Info().Msg("interrupted")
- }
- }()
- }
- workerLog.Info().Msg("Terminated")
- }()
- // Worker killer
- go func() {
- e.dynamicWorkers.Add(1)
- defer e.dynamicWorkers.Done()
- defer func() {
- if err := recover(); err != nil {
- e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- }()
- workerLog := e.log.With().Str("Worker type", "killer").Logger()
- if !e.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for e.running && !e.shutdown {
- workerLog.Trace().Msg("running")
- mutex.Lock()
- newWorkers := make([]*worker, 0)
- for _, _worker := range e.worker {
- deadline := time.Now().Add(-timeToBecomeUnused)
- workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
- if _worker.lastReceived.Before(deadline) {
- workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
- _worker.interrupted.Store(true)
- close(_worker.interrupter)
- e.currentNumberOfWorkers.Add(-1)
- } else {
- workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
- newWorkers = append(newWorkers, _worker)
- }
- }
- e.worker = newWorkers
- mutex.Unlock()
- func() {
- workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
- timer := time.NewTimer(downScaleInterval)
- defer utils.CleanupTimer(timer)
- select {
- case <-timer.C:
- case <-e.interrupter:
- workerLog.Info().Msg("interrupted")
- }
- }()
- }
- workerLog.Info().Msg("Terminated")
- }()
-}
-
-func (e *executor) Stop() {
- e.log.Trace().Msg("stopping now")
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if !e.running || e.shutdown {
- e.log.Warn().Msg("already stopped")
- return
- }
- e.shutdown = true
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.shutdown.Store(true)
- worker.interrupted.Store(true)
- close(worker.interrupter)
- }
- e.running = false
- e.shutdown = false
- e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
- e.workerWaitGroup.Wait()
- e.log.Trace().Msg("stopped")
-}
-
-func (e *dynamicExecutor) Stop() {
- e.log.Trace().Msg("stopping now")
- e.dynamicStateChange.Lock()
- defer e.dynamicStateChange.Unlock()
- if !e.running || e.shutdown {
- e.log.Warn().Msg("already stopped")
- return
- }
- close(e.interrupter)
- e.log.Trace().Msg("stopping inner executor")
- e.executor.Stop()
- e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
- e.dynamicWorkers.Wait()
- e.log.Trace().Msg("stopped")
-}
-
-func (e *executor) Close() error {
- e.Stop()
- return nil
-}
-
-func (e *executor) IsRunning() bool {
- return e.running && !e.shutdown
-}
-
-type CompletionFuture interface {
- AwaitCompletion(ctx context.Context) error
- Cancel(interrupt bool, err error)
-}
-
-type future struct {
- cancelRequested atomic.Bool
- interruptRequested atomic.Bool
- completed atomic.Bool
- errored atomic.Bool
- err atomic.Value
-}
-
-func (f *future) Cancel(interrupt bool, err error) {
- f.cancelRequested.Store(true)
- f.interruptRequested.Store(interrupt)
- if err != nil {
- f.errored.Store(true)
- f.err.Store(err)
- }
-}
-
-func (f *future) complete() {
- f.completed.Store(true)
-}
-
-// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
-var Canceled = errors.New("Canceled")
-
-func (f *future) AwaitCompletion(ctx context.Context) error {
- for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
- time.Sleep(time.Millisecond * 10)
- }
- if err := ctx.Err(); err != nil {
- return err
- }
- if err, ok := f.err.Load().(error); ok {
- return err
- }
- if f.cancelRequested.Load() {
- return Canceled
- }
- return nil
-}
-
-func (f *future) String() string {
- return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
- f.cancelRequested.Load(),
- f.interruptRequested.Load(),
- f.completed.Load(),
- f.errored.Load(),
- f.err.Load(),
- )
-}
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 9a62a09d25..b531d94e7e 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -21,9 +21,7 @@ package pool
import (
"context"
- "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"math/rand"
@@ -32,212 +30,6 @@ import (
"time"
)
-func TestExecutor_Start(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- tests := []struct {
- name string
- fields fields
- shouldRun bool
- }{
- {
- name: "Start fresh",
- shouldRun: true,
- },
- {
- name: "Start running",
- fields: fields{
- running: true,
- },
- shouldRun: true,
- },
- {
- name: "Start stopping",
- fields: fields{
- running: true,
- shutdown: true,
- },
- shouldRun: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Start()
- assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
- })
- }
-}
-
-func TestExecutor_Stop(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- tests := []struct {
- name string
- fields fields
- shouldRun bool
- }{
- {
- name: "Stop stopped",
- shouldRun: false,
- },
- {
- name: "Stop running",
- fields: fields{
- running: true,
- queue: make(chan workItem),
- worker: []*worker{
- func() *worker {
- w := &worker{}
- w.initialize()
- return w
- }(),
- },
- },
- shouldRun: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Stop()
- })
- }
-}
-
-func TestExecutor_Submit(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- type args struct {
- workItemId int32
- runnable Runnable
- context context.Context
- }
- tests := []struct {
- name string
- fields fields
- args args
- completionFutureValidator func(t *testing.T, future CompletionFuture) bool
- waitForCompletion bool
- }{
- {
- name: "submitting nothing",
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- return assert.Error(t, completionFuture.(*future).err.Load().(error))
- },
- },
- {
- name: "submit canceled",
- fields: fields{
- queue: make(chan workItem, 0),
- },
- args: args{
- workItemId: 13,
- runnable: func() {
- // We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
- },
- context: func() context.Context {
- ctx, cancelFunc := context.WithCancel(context.Background())
- cancelFunc()
- return ctx
- }(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- err := completionFuture.(*future).err.Load().(error)
- return assert.Error(t, err)
- },
- },
- {
- name: "Submit something which doesn't complete",
- fields: fields{
- queue: make(chan workItem, 1),
- },
- args: args{
- workItemId: 13,
- runnable: func() {
- // We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
- },
- context: context.TODO(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- completed := completionFuture.(*future).completed.Load()
- return assert.False(t, completed)
- },
- },
- {
- name: "Submit something which does complete",
- fields: func() fields {
- var executor = NewFixedSizeExecutor(1, 1).(*executor)
- return fields{
- running: executor.running,
- shutdown: executor.shutdown,
- worker: executor.worker,
- queue: executor.workItems,
- traceWorkers: true,
- }
- }(),
- args: args{
- workItemId: 13,
- runnable: func() {
- // NOOP
- },
- context: context.TODO(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- completed := completionFuture.(*future).completed.Load()
- return assert.True(t, completed)
- },
- waitForCompletion: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Start()
- completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
- if tt.waitForCompletion {
- assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
- }
- assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
- })
- }
-}
-
func TestNewFixedSizeExecutor(t *testing.T) {
type args struct {
numberOfWorkers int
@@ -407,487 +199,6 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
}
}
-func TestWorkItem_String(t *testing.T) {
- type fields struct {
- workItemId int32
- runnable Runnable
- completionFuture *future
- }
- tests := []struct {
- name string
- fields fields
- want string
- }{
- {
- name: "Simple test",
- want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- w := &workItem{
- workItemId: tt.fields.workItemId,
- runnable: tt.fields.runnable,
- completionFuture: tt.fields.completionFuture,
- }
- assert.Equalf(t, tt.want, w.String(), "String()")
- })
- }
-}
-
-func TestWorker_work(t *testing.T) {
- type fields struct {
- id int
- executor *executor
- }
- tests := []struct {
- name string
- fields fields
- timeBeforeFirstValidation time.Duration
- firstValidation func(*testing.T, *worker)
- timeBeforeManipulation time.Duration
- manipulator func(*worker)
- timeBeforeSecondValidation time.Duration
- secondValidation func(*testing.T, *worker)
- }{
- {
- name: "Worker should work till shutdown (even if it panics)",
- fields: fields{
- id: 0,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- panic("Oh no what should I do???")
- },
- completionFuture: &future{},
- }
- }()
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Worker should work till shutdown",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- time.Sleep(time.Millisecond * 70)
- },
- completionFuture: &future{},
- }
- }()
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Work interrupted",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Work on canceled",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- completionFuture := &future{}
- completionFuture.cancelRequested.Store(true)
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- time.Sleep(time.Millisecond * 70)
- },
- completionFuture: completionFuture,
- }
- }()
- return e
- }(),
- },
- timeBeforeManipulation: 50 * time.Millisecond,
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- w := &worker{
- id: tt.fields.id,
- interrupter: make(chan struct{}, 1),
- executor: tt.fields.executor,
- }
- go w.work()
- if tt.firstValidation != nil {
- time.Sleep(tt.timeBeforeFirstValidation)
- t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
- tt.firstValidation(t, w)
- }
- if tt.manipulator != nil {
- time.Sleep(tt.timeBeforeManipulation)
- t.Logf("manipulator after %v", tt.timeBeforeManipulation)
- tt.manipulator(w)
- }
- if tt.secondValidation != nil {
- time.Sleep(tt.timeBeforeSecondValidation)
- t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
- tt.secondValidation(t, w)
- }
- })
- }
-}
-
-func Test_future_AwaitCompletion(t *testing.T) {
- type args struct {
- ctx context.Context
- }
- tests := []struct {
- name string
- args args
- completer func(*future)
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "completes with error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- f.Cancel(false, errors.New("Uh oh"))
- },
- wantErr: assert.Error,
- },
- {
- name: "completes regular",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 30)
- f.complete()
- },
- wantErr: assert.NoError,
- },
- {
- name: "completes not int time",
- args: args{ctx: func() context.Context {
- deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
- t.Cleanup(cancel)
- return deadline
- }()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- },
- wantErr: assert.Error,
- },
- {
- name: "completes canceled without error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- f.Cancel(true, nil)
- },
- wantErr: func(t assert.TestingT, err error, i ...any) bool {
- assert.Same(t, Canceled, err)
- return true
- },
- },
- {
- name: "completes canceled with particular error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- f.Cancel(true, errors.New("Uh oh"))
- },
- wantErr: func(t assert.TestingT, err error, i ...any) bool {
- assert.Equal(t, "Uh oh", err.Error())
- return true
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- go tt.completer(f)
- tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
- })
- }
-}
-
-func Test_future_Cancel(t *testing.T) {
- type args struct {
- interrupt bool
- err error
- }
- tests := []struct {
- name string
- args args
- verifier func(*testing.T, *future)
- }{
- {
- name: "cancel cancels",
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- },
- },
- {
- name: "cancel with interrupt",
- args: args{
- interrupt: true,
- err: nil,
- },
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- assert.False(t, f.errored.Load())
- assert.Nil(t, f.err.Load())
- },
- },
- {
- name: "cancel with err",
- args: args{
- interrupt: true,
- err: errors.New("Uh Oh"),
- },
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- assert.True(t, f.errored.Load())
- assert.NotNil(t, f.err.Load())
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- f.Cancel(tt.args.interrupt, tt.args.err)
- tt.verifier(t, f)
- })
- }
-}
-
-func Test_future_complete(t *testing.T) {
- tests := []struct {
- name string
- verifier func(*testing.T, *future)
- }{
- {
- name: "complete completes",
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.completed.Load())
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- f.complete()
- tt.verifier(t, f)
- })
- }
-}
-
-func Test_dynamicExecutor_Start(t *testing.T) {
- type fields struct {
- executor *executor
- maxNumberOfWorkers int
- }
- tests := []struct {
- name string
- fields fields
- setup func(t *testing.T, fields *fields)
- startTwice bool
- }{
- {
- name: "just start",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "start twice",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- startTwice: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if tt.setup != nil {
- tt.setup(t, &tt.fields)
- }
- e := &dynamicExecutor{
- executor: tt.fields.executor,
- maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
- }
- e.Start()
- if tt.startTwice {
- e.Start()
- }
- // Let it work a bit
- time.Sleep(20 * time.Millisecond)
- t.Log("done with test")
- t.Cleanup(e.Stop)
- })
- }
-}
-
-func Test_dynamicExecutor_Stop(t *testing.T) {
- type fields struct {
- executor *executor
- maxNumberOfWorkers int
- interrupter chan struct{}
- }
- tests := []struct {
- name string
- fields fields
- setup func(t *testing.T, fields *fields)
- startIt bool
- stopTwice bool
- }{
- {
- name: "just stop",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "stop started",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "stop twice",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- stopTwice: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if tt.setup != nil {
- tt.setup(t, &tt.fields)
- }
- e := &dynamicExecutor{
- executor: tt.fields.executor,
- maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
- interrupter: tt.fields.interrupter,
- }
- if tt.startIt {
- e.Start()
- }
- e.Stop()
- if tt.stopTwice {
- e.Stop()
- }
- })
- }
-}
-
// from: https://github.com/golang/go/issues/36532#issuecomment-575535452
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
new file mode 100644
index 0000000000..22f8d20e5b
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/rs/zerolog"
+ "runtime/debug"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
+type dynamicExecutor struct {
+ *executor
+
+ maxNumberOfWorkers int
+ currentNumberOfWorkers atomic.Int32
+ dynamicStateChange sync.Mutex
+ interrupter chan struct{}
+
+ dynamicWorkers sync.WaitGroup
+}
+
+func (e *dynamicExecutor) Start() {
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ if e.interrupter != nil {
+ e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+ close(e.interrupter)
+ e.dynamicWorkers.Wait()
+ }
+
+ e.executor.Start()
+ mutex := sync.Mutex{}
+ e.interrupter = make(chan struct{})
+ // Worker spawner
+ go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Trace().Msg("running")
+ mutex.Lock()
+ numberOfItemsInQueue := len(e.workItems)
+ numberOfWorkers := len(e.worker)
+ workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+ if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+ workerLog.Trace().Msg("spawning new worker")
+ _worker := &worker{
+ id: numberOfWorkers - 1,
+ interrupter: make(chan struct{}, 1),
+ executor: e,
+ lastReceived: time.Now(),
+ log: e.log,
+ }
+ e.worker = append(e.worker, _worker)
+ _worker.initialize()
+ workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+ go _worker.work()
+ e.currentNumberOfWorkers.Add(1)
+ } else {
+ workerLog.Trace().Msg("Nothing to scale")
+ }
+ mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ timer := time.NewTimer(upScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
+ }
+ workerLog.Info().Msg("Terminated")
+ }()
+ // Worker killer
+ go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "killer").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Trace().Msg("running")
+ mutex.Lock()
+ newWorkers := make([]*worker, 0)
+ for _, _worker := range e.worker {
+ deadline := time.Now().Add(-timeToBecomeUnused)
+ workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+ if _worker.lastReceived.Before(deadline) {
+ workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+ _worker.interrupted.Store(true)
+ close(_worker.interrupter)
+ e.currentNumberOfWorkers.Add(-1)
+ } else {
+ workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+ newWorkers = append(newWorkers, _worker)
+ }
+ }
+ e.worker = newWorkers
+ mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ timer := time.NewTimer(downScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
+ }
+ workerLog.Info().Msg("Terminated")
+ }()
+}
+
+func (e *dynamicExecutor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ close(e.interrupter)
+ e.log.Trace().Msg("stopping inner executor")
+ e.executor.Stop()
+ e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+ e.dynamicWorkers.Wait()
+ e.log.Trace().Msg("stopped")
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
new file mode 100644
index 0000000000..1db92a5d7f
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "testing"
+ "time"
+)
+
+func Test_dynamicExecutor_Start(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startTwice bool
+ }{
+ {
+ name: "just start",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "start twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ startTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ }
+ e.Start()
+ if tt.startTwice {
+ e.Start()
+ }
+ // Let it work a bit
+ time.Sleep(20 * time.Millisecond)
+ t.Log("done with test")
+ t.Cleanup(e.Stop)
+ })
+ }
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ interrupter chan struct{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startIt bool
+ stopTwice bool
+ }{
+ {
+ name: "just stop",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ stopTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ interrupter: tt.fields.interrupter,
+ }
+ if tt.startIt {
+ e.Start()
+ }
+ e.Stop()
+ if tt.stopTwice {
+ e.Stop()
+ }
+ })
+ }
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
new file mode 100644
index 0000000000..79bddb6dc0
--- /dev/null
+++ b/plc4go/spi/pool/executor.go
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
+)
+
+type executor struct {
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+
+ workerWaitGroup sync.WaitGroup
+
+ log zerolog.Logger
+}
+
+func (e *executor) isTraceWorkers() bool {
+ return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+ return e.workItems
+}
+
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+ return &e.workerWaitGroup
+}
+
+func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
+ if runnable == nil {
+ value := atomic.Value{}
+ value.Store(errors.New("runnable must not be nil"))
+ return &future{err: value}
+ }
+ e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+ completionFuture := &future{}
+ if e.shutdown {
+ completionFuture.Cancel(false, errors.New("executor in shutdown"))
+ return completionFuture
+ }
+ select {
+ case e.workItems <- workItem{
+ workItemId: workItemId,
+ runnable: runnable,
+ completionFuture: completionFuture,
+ }:
+ e.log.Trace().Msg("Item added")
+ case <-ctx.Done():
+ completionFuture.Cancel(false, ctx.Err())
+ }
+
+ e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+ return completionFuture
+}
+
+func (e *executor) Start() {
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ e.running = true
+ e.shutdown = false
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ worker.initialize()
+ go worker.work()
+ }
+}
+
+func (e *executor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ e.shutdown = true
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ worker.shutdown.Store(true)
+ worker.interrupted.Store(true)
+ close(worker.interrupter)
+ }
+ e.running = false
+ e.shutdown = false
+ e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+ e.workerWaitGroup.Wait()
+ e.log.Trace().Msg("stopped")
+}
+
+func (e *executor) Close() error {
+ e.Stop()
+ return nil
+}
+
+func (e *executor) IsRunning() bool {
+ return e.running && !e.shutdown
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
new file mode 100644
index 0000000000..2a43b1366e
--- /dev/null
+++ b/plc4go/spi/pool/executor_test.go
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "testing"
+ "time"
+)
+
+func Test_executor_Close(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "close it",
+ wantErr: assert.NoError,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ tt.wantErr(t, e.Close(), fmt.Sprintf("Close()"))
+ })
+ }
+}
+
+func Test_executor_IsRunning(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want bool
+ }{
+ {
+ name: "no",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
+ })
+ }
+}
+
+func Test_executor_Start(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ shouldRun bool
+ }{
+ {
+ name: "Start fresh",
+ shouldRun: true,
+ },
+ {
+ name: "Start running",
+ fields: fields{
+ running: true,
+ },
+ shouldRun: true,
+ },
+ {
+ name: "Start stopping",
+ fields: fields{
+ running: true,
+ shutdown: true,
+ },
+ shouldRun: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Start()
+ assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
+ })
+ }
+}
+
+func Test_executor_Stop(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ shouldRun bool
+ }{
+ {
+ name: "Stop stopped",
+ shouldRun: false,
+ },
+ {
+ name: "Stop running",
+ fields: fields{
+ running: true,
+ queue: make(chan workItem),
+ worker: []*worker{
+ func() *worker {
+ w := &worker{}
+ w.initialize()
+ return w
+ }(),
+ },
+ },
+ shouldRun: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Stop()
+ })
+ }
+}
+
+func Test_executor_Submit(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ type args struct {
+ workItemId int32
+ runnable Runnable
+ context context.Context
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ completionFutureValidator func(t *testing.T, future CompletionFuture) bool
+ waitForCompletion bool
+ }{
+ {
+ name: "submitting nothing",
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ return assert.Error(t, completionFuture.(*future).err.Load().(error))
+ },
+ },
+ {
+ name: "submit canceled",
+ fields: fields{
+ queue: make(chan workItem, 0),
+ },
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // We do something for 3 seconds
+ <-time.NewTimer(3 * time.Second).C
+ },
+ context: func() context.Context {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ cancelFunc()
+ return ctx
+ }(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ err := completionFuture.(*future).err.Load().(error)
+ return assert.Error(t, err)
+ },
+ },
+ {
+ name: "Submit something which doesn't complete",
+ fields: fields{
+ queue: make(chan workItem, 1),
+ },
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // We do something for 3 seconds
+ <-time.NewTimer(3 * time.Second).C
+ },
+ context: context.TODO(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ completed := completionFuture.(*future).completed.Load()
+ return assert.False(t, completed)
+ },
+ },
+ {
+ name: "Submit something which does complete",
+ fields: func() fields {
+ var executor = NewFixedSizeExecutor(1, 1).(*executor)
+ return fields{
+ running: executor.running,
+ shutdown: executor.shutdown,
+ worker: executor.worker,
+ queue: executor.workItems,
+ traceWorkers: true,
+ }
+ }(),
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // NOOP
+ },
+ context: context.TODO(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ completed := completionFuture.(*future).completed.Load()
+ return assert.True(t, completed)
+ },
+ waitForCompletion: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Start()
+ completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
+ if tt.waitForCompletion {
+ assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
+ }
+ assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+ })
+ }
+}
+
+func Test_executor_getWorkerWaitGroup(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *sync.WaitGroup
+ }{
+ {
+ name: "get it",
+ want: &sync.WaitGroup{},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.getWorkerWaitGroup(), "getWorkerWaitGroup()")
+ })
+ }
+}
+
+func Test_executor_getWorksItems(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want chan workItem
+ }{
+ {
+ name: "get it",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.getWorksItems(), "getWorksItems()")
+ })
+ }
+}
+
+func Test_executor_isTraceWorkers(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want bool
+ }{
+ {
+ name: "it is not",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.isTraceWorkers(), "isTraceWorkers()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
new file mode 100644
index 0000000000..942480bc1a
--- /dev/null
+++ b/plc4go/spi/pool/workItem.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import "fmt"
+
+type workItem struct {
+ workItemId int32
+ runnable Runnable
+ completionFuture *future
+}
+
+func (w workItem) String() string {
+ return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+}
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
new file mode 100644
index 0000000000..ebc4b30c5c
--- /dev/null
+++ b/plc4go/spi/pool/workItem_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_workItem_String(t *testing.T) {
+ type fields struct {
+ workItemId int32
+ runnable Runnable
+ completionFuture *future
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "Simple test",
+ want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &workItem{
+ workItemId: tt.fields.workItemId,
+ runnable: tt.fields.runnable,
+ completionFuture: tt.fields.completionFuture,
+ }
+ assert.Equalf(t, tt.want, w.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
new file mode 100644
index 0000000000..7530ed0ad9
--- /dev/null
+++ b/plc4go/spi/pool/worker.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "runtime/debug"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/rs/zerolog"
+)
+
+type worker struct {
+ id int
+ shutdown atomic.Bool
+ interrupted atomic.Bool
+ interrupter chan struct{}
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ hasEnded atomic.Bool
+ lastReceived time.Time
+
+ log zerolog.Logger
+}
+
+func (w *worker) initialize() {
+ w.shutdown.Store(false)
+ w.interrupted.Store(false)
+ w.interrupter = make(chan struct{}, 1)
+ w.hasEnded.Store(false)
+ w.lastReceived = time.Now()
+}
+
+func (w *worker) work() {
+ w.executor.getWorkerWaitGroup().Add(1)
+ defer w.executor.getWorkerWaitGroup().Done()
+ defer func() {
+ if err := recover(); err != nil {
+ w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ if !w.shutdown.Load() {
+ // if we are not in shutdown we continue
+ w.work()
+ }
+ }()
+ workerLog := w.log.With().Int("Worker id", w.id).Logger()
+ if !w.executor.isTraceWorkers() {
+ workerLog = zerolog.Nop()
+ }
+ workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+ w.hasEnded.Store(false)
+ workerLog.Debug().Msgf("setting to not ended")
+
+ for !w.shutdown.Load() {
+ workerLog.Debug().Msg("Working")
+ select {
+ case _workItem := <-w.executor.getWorksItems():
+ w.lastReceived = time.Now()
+ workerLog.Debug().Msgf("Got work item %v", _workItem)
+ if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
+ workerLog.Debug().Msg("We need to stop")
+ // TODO: do we need to complete with a error?
+ } else {
+ workerLog.Debug().Msgf("Running work item %v", _workItem)
+ _workItem.runnable()
+ _workItem.completionFuture.complete()
+ workerLog.Debug().Msgf("work item %v completed", _workItem)
+ }
+ case <-w.interrupter:
+ workerLog.Debug().Msg("We got interrupted")
+ }
+ }
+ w.hasEnded.Store(true)
+ workerLog.Debug().Msg("setting to ended")
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
new file mode 100644
index 0000000000..4f604d2efb
--- /dev/null
+++ b/plc4go/spi/pool/worker_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_worker_initialize(t *testing.T) {
+ type fields struct {
+ id int
+ interrupter chan struct{}
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived time.Time
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ }{
+ {
+ name: "do it",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ interrupter: tt.fields.interrupter,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ log: tt.fields.log,
+ }
+ w.initialize()
+ })
+ }
+}
+
+func Test_worker_work(t *testing.T) {
+ type fields struct {
+ id int
+ executor *executor
+ }
+ tests := []struct {
+ name string
+ fields fields
+ timeBeforeFirstValidation time.Duration
+ firstValidation func(*testing.T, *worker)
+ timeBeforeManipulation time.Duration
+ manipulator func(*worker)
+ timeBeforeSecondValidation time.Duration
+ secondValidation func(*testing.T, *worker)
+ }{
+ {
+ name: "Worker should work till shutdown (even if it panics)",
+ fields: fields{
+ id: 0,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ panic("Oh no what should I do???")
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Worker should work till shutdown",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ time.Sleep(time.Millisecond * 70)
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work interrupted",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work on canceled",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ completionFuture := &future{}
+ completionFuture.cancelRequested.Store(true)
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ time.Sleep(time.Millisecond * 70)
+ },
+ completionFuture: completionFuture,
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeManipulation: 50 * time.Millisecond,
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ interrupter: make(chan struct{}, 1),
+ executor: tt.fields.executor,
+ }
+ go w.work()
+ if tt.firstValidation != nil {
+ time.Sleep(tt.timeBeforeFirstValidation)
+ t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+ tt.firstValidation(t, w)
+ }
+ if tt.manipulator != nil {
+ time.Sleep(tt.timeBeforeManipulation)
+ t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+ tt.manipulator(w)
+ }
+ if tt.secondValidation != nil {
+ time.Sleep(tt.timeBeforeSecondValidation)
+ t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+ tt.secondValidation(t, w)
+ }
+ })
+ }
+}