You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/15 13:37:30 UTC
[plc4x] branch develop updated: test(plc4go/spi): add more tests for worker pool
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new a586784764 test(plc4go/spi): add more tests for worker pool
a586784764 is described below
commit a5867847646ebd6758a901df1a25df718aace977
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon May 15 15:37:21 2023 +0200
test(plc4go/spi): add more tests for worker pool
---
plc4go/spi/utils/WorkerPool.go | 76 ++++++++-----
plc4go/spi/utils/WorkerPool_test.go | 208 ++++++++++++++++++++++++++++++++----
2 files changed, 240 insertions(+), 44 deletions(-)
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 578b0d38e8..a2247065e9 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -39,7 +39,7 @@ type worker struct {
interrupted atomic.Bool
interrupter chan struct{}
executor *executor
- hasEnded bool
+ hasEnded atomic.Bool
lastReceived time.Time
}
@@ -47,7 +47,7 @@ func (w *worker) initialize() {
w.shutdown.Store(false)
w.interrupted.Store(false)
w.interrupter = make(chan struct{})
- w.hasEnded = false
+ w.hasEnded.Store(false)
w.lastReceived = time.Now()
}
@@ -61,11 +61,13 @@ func (w *worker) work() {
w.work()
}
}()
- w.hasEnded = false
workerLog := log.With().Int("Worker id", w.id).Logger()
if !w.executor.traceWorkers {
workerLog = zerolog.Nop()
}
+ workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+ w.hasEnded.Store(false)
+ workerLog.Debug().Msgf("setting to not ended")
for !w.shutdown.Load() {
workerLog.Debug().Msg("Working")
@@ -83,13 +85,11 @@ func (w *worker) work() {
workerLog.Debug().Msgf("work item %v completed", _workItem)
}
case <-w.interrupter:
- log.Debug().Msg("We got interrupted")
- default:
- workerLog.Debug().Msgf("Idling")
- time.Sleep(time.Millisecond * 10)
+ workerLog.Debug().Msg("We got interrupted")
}
}
- w.hasEnded = true
+ w.hasEnded.Store(true)
+ workerLog.Debug().Msg("setting to ended")
}
type workItem struct {
@@ -140,6 +140,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOp
return _executor
}
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
_executor := &executor{
maxNumberOfWorkers: maxNumberOfWorkers,
@@ -151,8 +155,10 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
}
// We spawn one initial worker
_executor.worker = append(_executor.worker, &worker{
- id: 0,
- executor: _executor,
+ id: 0,
+ interrupter: make(chan struct{}, 1),
+ executor: _executor,
+ lastReceived: time.Now(),
})
mutex := sync.Mutex{}
// Worker spawner
@@ -162,17 +168,30 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
log.Error().Msgf("panic-ed %v", err)
}
}()
+ workerLog := log.With().Str("Worker type", "spawner").Logger()
+ if !_executor.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
for {
- time.Sleep(100 * time.Millisecond)
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ time.Sleep(upScaleInterval)
mutex.Lock()
- if len(_executor.queue) > len(_executor.worker) && len(_executor.worker) < maxNumberOfWorkers {
- worker := &worker{
- id: len(_executor.worker) - 1,
- executor: _executor,
+ numberOfItemsInQueue := len(_executor.queue)
+ numberOfWorkers := len(_executor.worker)
+ workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
+ if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
+ _worker := &worker{
+ id: numberOfWorkers - 1,
+ interrupter: make(chan struct{}, 1),
+ executor: _executor,
+ lastReceived: time.Now(),
}
- _executor.worker = append(_executor.worker, worker)
- worker.initialize()
- go worker.work()
+ _executor.worker = append(_executor.worker, _worker)
+ _worker.initialize()
+ workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+ go _worker.work()
+ } else {
+ workerLog.Trace().Msg("Nothing to scale")
}
mutex.Unlock()
}
@@ -184,16 +203,25 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
log.Error().Msgf("panic-ed %v", err)
}
}()
+ workerLog := log.With().Str("Worker type", "killer").Logger()
+ if !_executor.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
for {
- time.Sleep(5 * time.Second)
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ time.Sleep(downScaleInterval)
mutex.Lock()
newWorkers := make([]*worker, 0)
- for _, worker := range _executor.worker {
- if worker.lastReceived.Before(time.Now().Add(-5 * time.Second)) {
- worker.interrupted.Store(true)
- close(worker.interrupter)
+ for _, _worker := range _executor.worker {
+ deadline := time.Now().Add(-timeToBecomeUnused)
+ workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+ if _worker.lastReceived.Before(deadline) {
+ workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+ _worker.interrupted.Store(true)
+ close(_worker.interrupter)
} else {
- newWorkers = append(newWorkers, worker)
+ workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+ newWorkers = append(newWorkers, _worker)
}
}
_executor.worker = newWorkers
diff --git a/plc4go/spi/utils/WorkerPool_test.go b/plc4go/spi/utils/WorkerPool_test.go
index ade30c2fb7..fcb006b982 100644
--- a/plc4go/spi/utils/WorkerPool_test.go
+++ b/plc4go/spi/utils/WorkerPool_test.go
@@ -24,6 +24,7 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
+ "math/rand"
"testing"
"time"
)
@@ -98,6 +99,13 @@ func TestExecutor_Stop(t *testing.T) {
fields: fields{
running: true,
queue: make(chan workItem),
+ worker: []*worker{
+ func() *worker {
+ w := &worker{}
+ w.initialize()
+ return w
+ }(),
+ },
},
shouldRun: false,
},
@@ -133,9 +141,37 @@ func TestExecutor_Submit(t *testing.T) {
name string
fields fields
args args
- completionFutureValidator func(CompletionFuture) bool
+ completionFutureValidator func(t *testing.T, future CompletionFuture) bool
waitForCompletion bool
}{
+ {
+ name: "submitting nothing",
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ return assert.Error(t, completionFuture.(*future).err.Load().(error))
+ },
+ },
+ {
+ name: "submit canceled",
+ fields: fields{
+ queue: make(chan workItem, 0),
+ },
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // We do something for 3 seconds
+ <-time.NewTimer(3 * time.Second).C
+ },
+ context: func() context.Context {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ cancelFunc()
+ return ctx
+ }(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ err := completionFuture.(*future).err.Load().(error)
+ return assert.Error(t, err)
+ },
+ },
{
name: "Submit something which doesn't complete",
fields: fields{
@@ -149,9 +185,9 @@ func TestExecutor_Submit(t *testing.T) {
},
context: context.TODO(),
},
- completionFutureValidator: func(completionFuture CompletionFuture) bool {
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
completed := completionFuture.(*future).completed.Load()
- return !completed
+ return assert.False(t, completed)
},
},
{
@@ -173,9 +209,9 @@ func TestExecutor_Submit(t *testing.T) {
},
context: context.TODO(),
},
- completionFutureValidator: func(completionFuture CompletionFuture) bool {
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
completed := completionFuture.(*future).completed.Load()
- return completed
+ return assert.True(t, completed)
},
waitForCompletion: true,
},
@@ -194,7 +230,7 @@ func TestExecutor_Submit(t *testing.T) {
if tt.waitForCompletion {
assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
}
- assert.True(t, tt.completionFutureValidator(completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+ assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
})
}
}
@@ -215,7 +251,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: nil,
+ options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
},
executorValidator: func(t *testing.T, e *executor) bool {
return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.queue) == 14
@@ -225,6 +261,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixedSizeExecutor := NewFixedSizeExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+ defer fixedSizeExecutor.Stop()
assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
})
}
@@ -239,6 +276,7 @@ func TestNewDynamicExecutor(t *testing.T) {
tests := []struct {
name string
args args
+ manipulator func(*testing.T, *executor)
executorValidator func(*testing.T, *executor) bool
}{
{
@@ -246,7 +284,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: nil,
+ options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
},
executorValidator: func(t *testing.T, e *executor) bool {
assert.False(t, e.running)
@@ -256,10 +294,72 @@ func TestNewDynamicExecutor(t *testing.T) {
return true
},
},
+ {
+ name: "test scaling",
+ args: args{
+ numberOfWorkers: 2,
+ queueDepth: 2,
+ options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+ },
+ manipulator: func(t *testing.T, e *executor) {
+ {
+ oldUpScaleInterval := upScaleInterval
+ t.Cleanup(func() {
+ t.Logf("Ressetting up scale interval to %v", oldUpScaleInterval)
+ upScaleInterval = oldUpScaleInterval
+ })
+ upScaleInterval = 10 * time.Millisecond
+ t.Logf("Changed up scale interval to %v", upScaleInterval)
+ }
+ {
+ oldDownScaleInterval := downScaleInterval
+ t.Cleanup(func() {
+ t.Logf("Ressetting down scale interval to %v", oldDownScaleInterval)
+ downScaleInterval = oldDownScaleInterval
+ })
+ downScaleInterval = 10 * time.Millisecond
+ t.Logf("Changed down scale interval to %v", downScaleInterval)
+ }
+ {
+ oldTimeToBecomeUnused := timeToBecomeUnused
+ t.Cleanup(func() {
+ t.Logf("Ressetting time to be become unused to %v", oldTimeToBecomeUnused)
+ timeToBecomeUnused = oldTimeToBecomeUnused
+ })
+ timeToBecomeUnused = 100 * time.Millisecond
+ }
+ t.Log("fill some jobs")
+ go func() {
+ for i := 0; i < 500; i++ {
+ e.queue <- workItem{
+ workItemId: int32(i),
+ runnable: func() {
+ max := 100
+ min := 10
+ sleepTime := time.Duration(rand.Intn(max-min)+min) * time.Millisecond
+ t.Logf("Sleeping for %v", sleepTime)
+ time.Sleep(sleepTime)
+ },
+ completionFuture: &future{},
+ }
+ }
+ }()
+ },
+ executorValidator: func(t *testing.T, e *executor) bool {
+ time.Sleep(500 * time.Millisecond)
+ assert.False(t, e.running)
+ assert.False(t, e.shutdown)
+ return true
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fixedSizeExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+ defer fixedSizeExecutor.Stop()
+ if tt.manipulator != nil {
+ tt.manipulator(t, fixedSizeExecutor.(*executor))
+ }
assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
})
}
@@ -329,6 +429,7 @@ func TestWorker_work(t *testing.T) {
fields fields
timeBeforeFirstValidation time.Duration
firstValidation func(*testing.T, *worker)
+ timeBeforeManipulation time.Duration
manipulator func(*worker)
timeBeforeSecondValidation time.Duration
secondValidation func(*testing.T, *worker)
@@ -356,16 +457,18 @@ func TestWorker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded)
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded)
+ assert.True(t, w.hasEnded.Load(), "should be ended")
},
- }, {
+ },
+ {
name: "Worker should work till shutdown",
fields: fields{
id: 1,
@@ -388,29 +491,94 @@ func TestWorker_work(t *testing.T) {
},
timeBeforeFirstValidation: 50 * time.Millisecond,
firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded, "should not be ended")
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
},
manipulator: func(w *worker) {
w.shutdown.Store(true)
},
timeBeforeSecondValidation: 150 * time.Millisecond,
secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded, "should be ended")
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work interrupted",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ queue: make(chan workItem),
+ traceWorkers: true,
+ }
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work on canceled",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ queue: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ completionFuture := &future{}
+ completionFuture.cancelRequested.Store(true)
+ e.queue <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ time.Sleep(time.Millisecond * 70)
+ },
+ completionFuture: completionFuture,
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeManipulation: 50 * time.Millisecond,
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
w := &worker{
- id: tt.fields.id,
- executor: tt.fields.executor,
+ id: tt.fields.id,
+ interrupter: make(chan struct{}, 1),
+ executor: tt.fields.executor,
}
go w.work()
- time.Sleep(tt.timeBeforeFirstValidation)
- tt.firstValidation(t, w)
- tt.manipulator(w)
- time.Sleep(tt.timeBeforeSecondValidation)
- tt.secondValidation(t, w)
+ if tt.firstValidation != nil {
+ time.Sleep(tt.timeBeforeFirstValidation)
+ t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+ tt.firstValidation(t, w)
+ }
+ if tt.manipulator != nil {
+ time.Sleep(tt.timeBeforeManipulation)
+ t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+ tt.manipulator(w)
+ }
+ if tt.secondValidation != nil {
+ time.Sleep(tt.timeBeforeSecondValidation)
+ t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+ tt.secondValidation(t, w)
+ }
})
}
}