You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/15 13:37:30 UTC

[plc4x] branch develop updated: test(plc4go/spi): add more tests for worker pool

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new a586784764 test(plc4go/spi): add more tests for worker pool
a586784764 is described below

commit a5867847646ebd6758a901df1a25df718aace977
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon May 15 15:37:21 2023 +0200

    test(plc4go/spi): add more tests for worker pool
---
 plc4go/spi/utils/WorkerPool.go      |  76 ++++++++-----
 plc4go/spi/utils/WorkerPool_test.go | 208 ++++++++++++++++++++++++++++++++----
 2 files changed, 240 insertions(+), 44 deletions(-)

diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/utils/WorkerPool.go
index 578b0d38e8..a2247065e9 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/utils/WorkerPool.go
@@ -39,7 +39,7 @@ type worker struct {
 	interrupted  atomic.Bool
 	interrupter  chan struct{}
 	executor     *executor
-	hasEnded     bool
+	hasEnded     atomic.Bool
 	lastReceived time.Time
 }
 
@@ -47,7 +47,7 @@ func (w *worker) initialize() {
 	w.shutdown.Store(false)
 	w.interrupted.Store(false)
 	w.interrupter = make(chan struct{})
-	w.hasEnded = false
+	w.hasEnded.Store(false)
 	w.lastReceived = time.Now()
 }
 
@@ -61,11 +61,13 @@ func (w *worker) work() {
 			w.work()
 		}
 	}()
-	w.hasEnded = false
 	workerLog := log.With().Int("Worker id", w.id).Logger()
 	if !w.executor.traceWorkers {
 		workerLog = zerolog.Nop()
 	}
+	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+	w.hasEnded.Store(false)
+	workerLog.Debug().Msgf("setting to not ended")
 
 	for !w.shutdown.Load() {
 		workerLog.Debug().Msg("Working")
@@ -83,13 +85,11 @@ func (w *worker) work() {
 				workerLog.Debug().Msgf("work item %v completed", _workItem)
 			}
 		case <-w.interrupter:
-			log.Debug().Msg("We got interrupted")
-		default:
-			workerLog.Debug().Msgf("Idling")
-			time.Sleep(time.Millisecond * 10)
+			workerLog.Debug().Msg("We got interrupted")
 		}
 	}
-	w.hasEnded = true
+	w.hasEnded.Store(true)
+	workerLog.Debug().Msg("setting to ended")
 }
 
 type workItem struct {
@@ -140,6 +140,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOp
 	return _executor
 }
 
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
 	_executor := &executor{
 		maxNumberOfWorkers: maxNumberOfWorkers,
@@ -151,8 +155,10 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 	}
 	// We spawn one initial worker
 	_executor.worker = append(_executor.worker, &worker{
-		id:       0,
-		executor: _executor,
+		id:           0,
+		interrupter:  make(chan struct{}, 1),
+		executor:     _executor,
+		lastReceived: time.Now(),
 	})
 	mutex := sync.Mutex{}
 	// Worker spawner
@@ -162,17 +168,30 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 				log.Error().Msgf("panic-ed %v", err)
 			}
 		}()
+		workerLog := log.With().Str("Worker type", "spawner").Logger()
+		if !_executor.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
 		for {
-			time.Sleep(100 * time.Millisecond)
+			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+			time.Sleep(upScaleInterval)
 			mutex.Lock()
-			if len(_executor.queue) > len(_executor.worker) && len(_executor.worker) < maxNumberOfWorkers {
-				worker := &worker{
-					id:       len(_executor.worker) - 1,
-					executor: _executor,
+			numberOfItemsInQueue := len(_executor.queue)
+			numberOfWorkers := len(_executor.worker)
+			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
+			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
+				_worker := &worker{
+					id:           numberOfWorkers - 1,
+					interrupter:  make(chan struct{}, 1),
+					executor:     _executor,
+					lastReceived: time.Now(),
 				}
-				_executor.worker = append(_executor.worker, worker)
-				worker.initialize()
-				go worker.work()
+				_executor.worker = append(_executor.worker, _worker)
+				_worker.initialize()
+				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+				go _worker.work()
+			} else {
+				workerLog.Trace().Msg("Nothing to scale")
 			}
 			mutex.Unlock()
 		}
@@ -184,16 +203,25 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 				log.Error().Msgf("panic-ed %v", err)
 			}
 		}()
+		workerLog := log.With().Str("Worker type", "killer").Logger()
+		if !_executor.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
 		for {
-			time.Sleep(5 * time.Second)
+			workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+			time.Sleep(downScaleInterval)
 			mutex.Lock()
 			newWorkers := make([]*worker, 0)
-			for _, worker := range _executor.worker {
-				if worker.lastReceived.Before(time.Now().Add(-5 * time.Second)) {
-					worker.interrupted.Store(true)
-					close(worker.interrupter)
+			for _, _worker := range _executor.worker {
+				deadline := time.Now().Add(-timeToBecomeUnused)
+				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+				if _worker.lastReceived.Before(deadline) {
+					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+					_worker.interrupted.Store(true)
+					close(_worker.interrupter)
 				} else {
-					newWorkers = append(newWorkers, worker)
+					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+					newWorkers = append(newWorkers, _worker)
 				}
 			}
 			_executor.worker = newWorkers
diff --git a/plc4go/spi/utils/WorkerPool_test.go b/plc4go/spi/utils/WorkerPool_test.go
index ade30c2fb7..fcb006b982 100644
--- a/plc4go/spi/utils/WorkerPool_test.go
+++ b/plc4go/spi/utils/WorkerPool_test.go
@@ -24,6 +24,7 @@ import (
 	"fmt"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
+	"math/rand"
 	"testing"
 	"time"
 )
@@ -98,6 +99,13 @@ func TestExecutor_Stop(t *testing.T) {
 			fields: fields{
 				running: true,
 				queue:   make(chan workItem),
+				worker: []*worker{
+					func() *worker {
+						w := &worker{}
+						w.initialize()
+						return w
+					}(),
+				},
 			},
 			shouldRun: false,
 		},
@@ -133,9 +141,37 @@ func TestExecutor_Submit(t *testing.T) {
 		name                      string
 		fields                    fields
 		args                      args
-		completionFutureValidator func(CompletionFuture) bool
+		completionFutureValidator func(t *testing.T, future CompletionFuture) bool
 		waitForCompletion         bool
 	}{
+		{
+			name: "submitting nothing",
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				return assert.Error(t, completionFuture.(*future).err.Load().(error))
+			},
+		},
+		{
+			name: "submit canceled",
+			fields: fields{
+				queue: make(chan workItem, 0),
+			},
+			args: args{
+				workItemId: 13,
+				runnable: func() {
+					// We do something for 3 seconds
+					<-time.NewTimer(3 * time.Second).C
+				},
+				context: func() context.Context {
+					ctx, cancelFunc := context.WithCancel(context.Background())
+					cancelFunc()
+					return ctx
+				}(),
+			},
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+				err := completionFuture.(*future).err.Load().(error)
+				return assert.Error(t, err)
+			},
+		},
 		{
 			name: "Submit something which doesn't complete",
 			fields: fields{
@@ -149,9 +185,9 @@ func TestExecutor_Submit(t *testing.T) {
 				},
 				context: context.TODO(),
 			},
-			completionFutureValidator: func(completionFuture CompletionFuture) bool {
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
 				completed := completionFuture.(*future).completed.Load()
-				return !completed
+				return assert.False(t, completed)
 			},
 		},
 		{
@@ -173,9 +209,9 @@ func TestExecutor_Submit(t *testing.T) {
 				},
 				context: context.TODO(),
 			},
-			completionFutureValidator: func(completionFuture CompletionFuture) bool {
+			completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
 				completed := completionFuture.(*future).completed.Load()
-				return completed
+				return assert.True(t, completed)
 			},
 			waitForCompletion: true,
 		},
@@ -194,7 +230,7 @@ func TestExecutor_Submit(t *testing.T) {
 			if tt.waitForCompletion {
 				assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
 			}
-			assert.True(t, tt.completionFutureValidator(completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+			assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
 		})
 	}
 }
@@ -215,7 +251,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         nil,
+				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
 			},
 			executorValidator: func(t *testing.T, e *executor) bool {
 				return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.queue) == 14
@@ -225,6 +261,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			fixedSizeExecutor := NewFixedSizeExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+			defer fixedSizeExecutor.Stop()
 			assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
 		})
 	}
@@ -239,6 +276,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 	tests := []struct {
 		name              string
 		args              args
+		manipulator       func(*testing.T, *executor)
 		executorValidator func(*testing.T, *executor) bool
 	}{
 		{
@@ -246,7 +284,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         nil,
+				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
 			},
 			executorValidator: func(t *testing.T, e *executor) bool {
 				assert.False(t, e.running)
@@ -256,10 +294,72 @@ func TestNewDynamicExecutor(t *testing.T) {
 				return true
 			},
 		},
+		{
+			name: "test scaling",
+			args: args{
+				numberOfWorkers: 2,
+				queueDepth:      2,
+				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+			},
+			manipulator: func(t *testing.T, e *executor) {
+				{
+					oldUpScaleInterval := upScaleInterval
+					t.Cleanup(func() {
+						t.Logf("Ressetting up scale interval to %v", oldUpScaleInterval)
+						upScaleInterval = oldUpScaleInterval
+					})
+					upScaleInterval = 10 * time.Millisecond
+					t.Logf("Changed up scale interval to %v", upScaleInterval)
+				}
+				{
+					oldDownScaleInterval := downScaleInterval
+					t.Cleanup(func() {
+						t.Logf("Ressetting down scale interval to %v", oldDownScaleInterval)
+						downScaleInterval = oldDownScaleInterval
+					})
+					downScaleInterval = 10 * time.Millisecond
+					t.Logf("Changed down scale interval to %v", downScaleInterval)
+				}
+				{
+					oldTimeToBecomeUnused := timeToBecomeUnused
+					t.Cleanup(func() {
+						t.Logf("Ressetting time to be become unused to %v", oldTimeToBecomeUnused)
+						timeToBecomeUnused = oldTimeToBecomeUnused
+					})
+					timeToBecomeUnused = 100 * time.Millisecond
+				}
+				t.Log("fill some jobs")
+				go func() {
+					for i := 0; i < 500; i++ {
+						e.queue <- workItem{
+							workItemId: int32(i),
+							runnable: func() {
+								max := 100
+								min := 10
+								sleepTime := time.Duration(rand.Intn(max-min)+min) * time.Millisecond
+								t.Logf("Sleeping for %v", sleepTime)
+								time.Sleep(sleepTime)
+							},
+							completionFuture: &future{},
+						}
+					}
+				}()
+			},
+			executorValidator: func(t *testing.T, e *executor) bool {
+				time.Sleep(500 * time.Millisecond)
+				assert.False(t, e.running)
+				assert.False(t, e.shutdown)
+				return true
+			},
+		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			fixedSizeExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+			defer fixedSizeExecutor.Stop()
+			if tt.manipulator != nil {
+				tt.manipulator(t, fixedSizeExecutor.(*executor))
+			}
 			assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
 		})
 	}
@@ -329,6 +429,7 @@ func TestWorker_work(t *testing.T) {
 		fields                     fields
 		timeBeforeFirstValidation  time.Duration
 		firstValidation            func(*testing.T, *worker)
+		timeBeforeManipulation     time.Duration
 		manipulator                func(*worker)
 		timeBeforeSecondValidation time.Duration
 		secondValidation           func(*testing.T, *worker)
@@ -356,16 +457,18 @@ func TestWorker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded)
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded)
+				assert.True(t, w.hasEnded.Load(), "should be ended")
 			},
-		}, {
+		},
+		{
 			name: "Worker should work till shutdown",
 			fields: fields{
 				id: 1,
@@ -388,29 +491,94 @@ func TestWorker_work(t *testing.T) {
 			},
 			timeBeforeFirstValidation: 50 * time.Millisecond,
 			firstValidation: func(t *testing.T, w *worker) {
-				assert.False(t, w.hasEnded, "should not be ended")
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
 			},
 			manipulator: func(w *worker) {
 				w.shutdown.Store(true)
 			},
 			timeBeforeSecondValidation: 150 * time.Millisecond,
 			secondValidation: func(t *testing.T, w *worker) {
-				assert.True(t, w.hasEnded, "should be ended")
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work interrupted",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						queue:        make(chan workItem),
+						traceWorkers: true,
+					}
+					return e
+				}(),
+			},
+			timeBeforeFirstValidation: 50 * time.Millisecond,
+			firstValidation: func(t *testing.T, w *worker) {
+				assert.False(t, w.hasEnded.Load(), "should not be ended")
+			},
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
+			},
+			timeBeforeSecondValidation: 150 * time.Millisecond,
+			secondValidation: func(t *testing.T, w *worker) {
+				assert.True(t, w.hasEnded.Load(), "should be ended")
+			},
+		},
+		{
+			name: "Work on canceled",
+			fields: fields{
+				id: 1,
+				executor: func() *executor {
+					e := &executor{
+						queue:        make(chan workItem),
+						traceWorkers: true,
+					}
+					go func() {
+						completionFuture := &future{}
+						completionFuture.cancelRequested.Store(true)
+						e.queue <- workItem{
+							workItemId: 0,
+							runnable: func() {
+								time.Sleep(time.Millisecond * 70)
+							},
+							completionFuture: completionFuture,
+						}
+					}()
+					return e
+				}(),
+			},
+			timeBeforeManipulation: 50 * time.Millisecond,
+			manipulator: func(w *worker) {
+				w.shutdown.Store(true)
+				w.interrupter <- struct{}{}
 			},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			w := &worker{
-				id:       tt.fields.id,
-				executor: tt.fields.executor,
+				id:          tt.fields.id,
+				interrupter: make(chan struct{}, 1),
+				executor:    tt.fields.executor,
 			}
 			go w.work()
-			time.Sleep(tt.timeBeforeFirstValidation)
-			tt.firstValidation(t, w)
-			tt.manipulator(w)
-			time.Sleep(tt.timeBeforeSecondValidation)
-			tt.secondValidation(t, w)
+			if tt.firstValidation != nil {
+				time.Sleep(tt.timeBeforeFirstValidation)
+				t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+				tt.firstValidation(t, w)
+			}
+			if tt.manipulator != nil {
+				time.Sleep(tt.timeBeforeManipulation)
+				t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+				tt.manipulator(w)
+			}
+			if tt.secondValidation != nil {
+				time.Sleep(tt.timeBeforeSecondValidation)
+				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+				tt.secondValidation(t, w)
+			}
 		})
 	}
 }