You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/03 08:41:08 UTC

[plc4x] 02/02: fix(plc4go/spi): make shutdown of WorkerPool more reliable

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7d745dae3c663ab5c7b4a26a4a444c78fc8433ed
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:40:58 2023 +0200

    fix(plc4go/spi): make shutdown of WorkerPool more reliable
---
 plc4go/spi/pool/WorkerPool.go      | 138 +++++++++++++++++++++++++++-------
 plc4go/spi/pool/WorkerPool_test.go | 147 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 258 insertions(+), 27 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c9c031efa1..bcf143bef8 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"io"
@@ -42,6 +43,7 @@ type worker struct {
 	executor    interface {
 		isTraceWorkers() bool
 		getWorksItems() chan workItem
+		getWorkerWaitGroup() *sync.WaitGroup
 	}
 	hasEnded     atomic.Bool
 	lastReceived time.Time
@@ -58,6 +60,8 @@ func (w *worker) initialize() {
 }
 
 func (w *worker) work() {
+	w.executor.getWorkerWaitGroup().Add(1)
+	defer w.executor.getWorkerWaitGroup().Done()
 	defer func() {
 		if recovered := recover(); recovered != nil {
 			w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
@@ -104,8 +108,8 @@ type workItem struct {
 	completionFuture *future
 }
 
-func (w *workItem) String() string {
-	return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
+func (w workItem) String() string {
+	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
 }
 
 type Executor interface {
@@ -117,13 +121,15 @@ type Executor interface {
 }
 
 type executor struct {
-	maxNumberOfWorkers int
-	running            bool
-	shutdown           bool
-	stateChange        sync.Mutex
-	worker             []*worker
-	workItems          chan workItem
-	traceWorkers       bool
+	running      bool
+	shutdown     bool
+	stateChange  sync.Mutex
+	worker       []*worker
+	queueDepth   int
+	workItems    chan workItem
+	traceWorkers bool
+
+	workerWaitGroup sync.WaitGroup
 
 	log zerolog.Logger
 }
@@ -136,9 +142,19 @@ func (e *executor) getWorksItems() chan workItem {
 	return e.workItems
 }
 
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+	return &e.workerWaitGroup
+}
+
 type dynamicExecutor struct {
-	executor
-	maxNumberOfWorkers int
+	*executor
+
+	maxNumberOfWorkers     int
+	currentNumberOfWorkers atomic.Int32
+	dynamicStateChange     sync.Mutex
+	interrupter            chan struct{}
+
+	dynamicWorkers sync.WaitGroup
 }
 
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
@@ -151,10 +167,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 		}
 	}
 	_executor := &executor{
-		maxNumberOfWorkers: numberOfWorkers,
-		workItems:          make(chan workItem, queueDepth),
-		worker:             workers,
-		log:                customLogger,
+		queueDepth: queueDepth,
+		workItems:  make(chan workItem, queueDepth),
+		worker:     workers,
+		log:        customLogger,
 	}
 	for _, option := range _options {
 		switch option := option.(type) {
@@ -175,11 +191,10 @@ var timeToBecomeUnused = 5 * time.Second
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLogger(_options...)
 	_executor := &dynamicExecutor{
-		executor: executor{
-			maxNumberOfWorkers: maxNumberOfWorkers,
-			workItems:          make(chan workItem, queueDepth),
-			worker:             make([]*worker, 0),
-			log:                customLogger,
+		executor: &executor{
+			workItems: make(chan workItem, queueDepth),
+			worker:    make([]*worker, 0),
+			log:       customLogger,
 		},
 		maxNumberOfWorkers: maxNumberOfWorkers,
 	}
@@ -240,6 +255,7 @@ func (e *executor) Start() {
 	e.stateChange.Lock()
 	defer e.stateChange.Unlock()
 	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
 		return
 	}
 	e.running = true
@@ -252,10 +268,25 @@ func (e *executor) Start() {
 }
 
 func (e *dynamicExecutor) Start() {
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	if e.interrupter != nil {
+		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+		close(e.interrupter)
+		e.dynamicWorkers.Wait()
+	}
+
 	e.executor.Start()
 	mutex := sync.Mutex{}
+	e.interrupter = make(chan struct{})
 	// Worker spawner
 	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				e.log.Error().Msgf("panic-ed %v", err)
@@ -266,13 +297,13 @@ func (e *dynamicExecutor) Start() {
 			workerLog = zerolog.Nop()
 		}
 		for e.running && !e.shutdown {
-			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
-			time.Sleep(upScaleInterval)
+			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			numberOfItemsInQueue := len(e.workItems)
 			numberOfWorkers := len(e.worker)
-			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
 			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+				workerLog.Trace().Msg("spawning new worker")
 				_worker := &worker{
 					id:           numberOfWorkers - 1,
 					interrupter:  make(chan struct{}, 1),
@@ -284,14 +315,28 @@ func (e *dynamicExecutor) Start() {
 				_worker.initialize()
 				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
 				go _worker.work()
+				e.currentNumberOfWorkers.Add(1)
 			} else {
 				workerLog.Trace().Msg("Nothing to scale")
 			}
 			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+				timer := time.NewTimer(upScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
 		}
+		workerLog.Info().Msg("Terminated")
 	}()
 	// Worker killer
 	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				e.log.Error().Msgf("panic-ed %v", err)
@@ -302,8 +347,7 @@ func (e *dynamicExecutor) Start() {
 			workerLog = zerolog.Nop()
 		}
 		for e.running && !e.shutdown {
-			workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
-			time.Sleep(downScaleInterval)
+			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			newWorkers := make([]*worker, 0)
 			for _, _worker := range e.worker {
@@ -313,6 +357,7 @@ func (e *dynamicExecutor) Start() {
 					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
 					_worker.interrupted.Store(true)
 					close(_worker.interrupter)
+					e.currentNumberOfWorkers.Add(-1)
 				} else {
 					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
 					newWorkers = append(newWorkers, _worker)
@@ -320,14 +365,27 @@ func (e *dynamicExecutor) Start() {
 			}
 			e.worker = newWorkers
 			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+				timer := time.NewTimer(downScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
 		}
+		workerLog.Info().Msg("Terminated")
 	}()
 }
 
 func (e *executor) Stop() {
+	e.log.Trace().Msg("stopping now")
 	e.stateChange.Lock()
 	defer e.stateChange.Unlock()
 	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
 		return
 	}
 	e.shutdown = true
@@ -337,9 +395,27 @@ func (e *executor) Stop() {
 		worker.interrupted.Store(true)
 		close(worker.interrupter)
 	}
-	close(e.workItems)
 	e.running = false
 	e.shutdown = false
+	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+	e.workerWaitGroup.Wait()
+	e.log.Trace().Msg("stopped")
+}
+
+func (e *dynamicExecutor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	close(e.interrupter)
+	e.log.Trace().Msg("stopping inner executor")
+	e.executor.Stop()
+	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+	e.dynamicWorkers.Wait()
+	e.log.Trace().Msg("stopped")
 }
 
 func (e *executor) Close() error {
@@ -395,3 +471,13 @@ func (f *future) AwaitCompletion(ctx context.Context) error {
 	}
 	return nil
 }
+
+func (f *future) String() string {
+	return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
+		f.cancelRequested.Load(),
+		f.interruptRequested.Load(),
+		f.completed.Load(),
+		f.errored.Load(),
+		f.err.Load(),
+	)
+}
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index b084e61d56..9a62a09d25 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -420,7 +420,7 @@ func TestWorkItem_String(t *testing.T) {
 	}{
 		{
 			name: "Simple test",
-			want: "Workitem{wid:0}",
+			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
 		},
 	}
 	for _, tt := range tests {
@@ -743,6 +743,151 @@ func Test_future_complete(t *testing.T) {
 	}
 }
 
+func Test_dynamicExecutor_Start(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+	}
+	tests := []struct {
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		startTwice bool
+	}{
+		{
+			name: "just start",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "start twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			startTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+			}
+			e.Start()
+			if tt.startTwice {
+				e.Start()
+			}
+			// Let it work a bit
+			time.Sleep(20 * time.Millisecond)
+			t.Log("done with test")
+			t.Cleanup(e.Stop)
+		})
+	}
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+		interrupter        chan struct{}
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		setup     func(t *testing.T, fields *fields)
+		startIt   bool
+		stopTwice bool
+	}{
+		{
+			name: "just stop",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			stopTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+				interrupter:        tt.fields.interrupter,
+			}
+			if tt.startIt {
+				e.Start()
+			}
+			e.Stop()
+			if tt.stopTwice {
+				e.Stop()
+			}
+		})
+	}
+}
+
 // from: https://github.com/golang/go/issues/36532#issuecomment-575535452
 func testContext(t *testing.T) context.Context {
 	ctx, cancel := context.WithCancel(context.Background())