You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 10:22:40 UTC

[plc4x] branch develop updated: fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9584bcde7 fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
d9584bcde7 is described below

commit d9584bcde767b716a46f1554b2cf90f79335bab9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 12:22:32 2023 +0200

    fix(plc4go/spi): properly shutdown worker spawner and killer on shutdown
---
 plc4go/spi/pool/WorkerPool.go      | 183 +++++++++++++++++++++----------------
 plc4go/spi/pool/WorkerPool_test.go |  18 ++--
 2 files changed, 112 insertions(+), 89 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c1881f1297..c9c031efa1 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -35,11 +35,14 @@ import (
 type Runnable func()
 
 type worker struct {
-	id           int
-	shutdown     atomic.Bool
-	interrupted  atomic.Bool
-	interrupter  chan struct{}
-	executor     *executor
+	id          int
+	shutdown    atomic.Bool
+	interrupted atomic.Bool
+	interrupter chan struct{}
+	executor    interface {
+		isTraceWorkers() bool
+		getWorksItems() chan workItem
+	}
 	hasEnded     atomic.Bool
 	lastReceived time.Time
 
@@ -65,7 +68,7 @@ func (w *worker) work() {
 		}
 	}()
 	workerLog := w.log.With().Int("Worker id", w.id).Logger()
-	if !w.executor.traceWorkers {
+	if !w.executor.isTraceWorkers() {
 		workerLog = zerolog.Nop()
 	}
 	workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
@@ -75,7 +78,7 @@ func (w *worker) work() {
 	for !w.shutdown.Load() {
 		workerLog.Debug().Msg("Working")
 		select {
-		case _workItem := <-w.executor.workItems:
+		case _workItem := <-w.executor.getWorksItems():
 			w.lastReceived = time.Now()
 			workerLog.Debug().Msgf("Got work item %v", _workItem)
 			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
@@ -125,6 +128,19 @@ type executor struct {
 	log zerolog.Logger
 }
 
+func (e *executor) isTraceWorkers() bool {
+	return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+	return e.workItems
+}
+
+type dynamicExecutor struct {
+	executor
+	maxNumberOfWorkers int
+}
+
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	workers := make([]*worker, numberOfWorkers)
 	customLogger := options.ExtractCustomLogger(_options...)
@@ -158,11 +174,14 @@ var timeToBecomeUnused = 5 * time.Second
 
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLogger(_options...)
-	_executor := &executor{
+	_executor := &dynamicExecutor{
+		executor: executor{
+			maxNumberOfWorkers: maxNumberOfWorkers,
+			workItems:          make(chan workItem, queueDepth),
+			worker:             make([]*worker, 0),
+			log:                customLogger,
+		},
 		maxNumberOfWorkers: maxNumberOfWorkers,
-		workItems:          make(chan workItem, queueDepth),
-		worker:             make([]*worker, 0),
-		log:                customLogger,
 	}
 	for _, option := range _options {
 		switch option := option.(type) {
@@ -178,75 +197,6 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
 		lastReceived: time.Now(),
 		log:          customLogger,
 	})
-	mutex := sync.Mutex{}
-	// Worker spawner
-	go func() {
-		defer func() {
-			if err := recover(); err != nil {
-				customLogger.Error().Msgf("panic-ed %v", err)
-			}
-		}()
-		workerLog := customLogger.With().Str("Worker type", "spawner").Logger()
-		if !_executor.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for {
-			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
-			time.Sleep(upScaleInterval)
-			mutex.Lock()
-			numberOfItemsInQueue := len(_executor.workItems)
-			numberOfWorkers := len(_executor.worker)
-			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
-			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
-				_worker := &worker{
-					id:           numberOfWorkers - 1,
-					interrupter:  make(chan struct{}, 1),
-					executor:     _executor,
-					lastReceived: time.Now(),
-					log:          customLogger,
-				}
-				_executor.worker = append(_executor.worker, _worker)
-				_worker.initialize()
-				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
-				go _worker.work()
-			} else {
-				workerLog.Trace().Msg("Nothing to scale")
-			}
-			mutex.Unlock()
-		}
-	}()
-	// Worker killer
-	go func() {
-		defer func() {
-			if err := recover(); err != nil {
-				_executor.log.Error().Msgf("panic-ed %v", err)
-			}
-		}()
-		workerLog := customLogger.With().Str("Worker type", "killer").Logger()
-		if !_executor.traceWorkers {
-			workerLog = zerolog.Nop()
-		}
-		for {
-			workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
-			time.Sleep(downScaleInterval)
-			mutex.Lock()
-			newWorkers := make([]*worker, 0)
-			for _, _worker := range _executor.worker {
-				deadline := time.Now().Add(-timeToBecomeUnused)
-				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
-				if _worker.lastReceived.Before(deadline) {
-					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
-					_worker.interrupted.Store(true)
-					close(_worker.interrupter)
-				} else {
-					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
-					newWorkers = append(newWorkers, _worker)
-				}
-			}
-			_executor.worker = newWorkers
-			mutex.Unlock()
-		}
-	}()
 	return _executor
 }
 
@@ -301,6 +251,79 @@ func (e *executor) Start() {
 	}
 }
 
+func (e *dynamicExecutor) Start() {
+	e.executor.Start()
+	mutex := sync.Mutex{}
+	// Worker spawner
+	go func() {
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v", err)
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+			time.Sleep(upScaleInterval)
+			mutex.Lock()
+			numberOfItemsInQueue := len(e.workItems)
+			numberOfWorkers := len(e.worker)
+			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+				_worker := &worker{
+					id:           numberOfWorkers - 1,
+					interrupter:  make(chan struct{}, 1),
+					executor:     e,
+					lastReceived: time.Now(),
+					log:          e.log,
+				}
+				e.worker = append(e.worker, _worker)
+				_worker.initialize()
+				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+				go _worker.work()
+			} else {
+				workerLog.Trace().Msg("Nothing to scale")
+			}
+			mutex.Unlock()
+		}
+	}()
+	// Worker killer
+	go func() {
+		defer func() {
+			if err := recover(); err != nil {
+				e.log.Error().Msgf("panic-ed %v", err)
+			}
+		}()
+		workerLog := e.log.With().Str("Worker type", "killer").Logger()
+		if !e.traceWorkers {
+			workerLog = zerolog.Nop()
+		}
+		for e.running && !e.shutdown {
+			workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+			time.Sleep(downScaleInterval)
+			mutex.Lock()
+			newWorkers := make([]*worker, 0)
+			for _, _worker := range e.worker {
+				deadline := time.Now().Add(-timeToBecomeUnused)
+				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+				if _worker.lastReceived.Before(deadline) {
+					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+					_worker.interrupted.Store(true)
+					close(_worker.interrupter)
+				} else {
+					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+					newWorkers = append(newWorkers, _worker)
+				}
+			}
+			e.worker = newWorkers
+			mutex.Unlock()
+		}
+	}()
+}
+
 func (e *executor) Stop() {
 	e.stateChange.Lock()
 	defer e.stateChange.Unlock()
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index d19448ec6e..fc64be079b 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -279,8 +279,8 @@ func TestNewDynamicExecutor(t *testing.T) {
 		name              string
 		args              args
 		setup             func(*testing.T, *args)
-		manipulator       func(*testing.T, *executor)
-		executorValidator func(*testing.T, *executor) bool
+		manipulator       func(*testing.T, *dynamicExecutor)
+		executorValidator func(*testing.T, *dynamicExecutor) bool
 	}{
 		{
 			name: "new Executor",
@@ -292,7 +292,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			setup: func(t *testing.T, args *args) {
 				args.options = append(args.options, options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
 			},
-			executorValidator: func(t *testing.T, e *executor) bool {
+			executorValidator: func(t *testing.T, e *dynamicExecutor) bool {
 				assert.False(t, e.running)
 				assert.False(t, e.shutdown)
 				assert.Len(t, e.worker, 1)
@@ -310,7 +310,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			setup: func(t *testing.T, args *args) {
 				args.options = append(args.options, options.WithCustomLogger(zerolog.New(zerolog.NewConsoleWriter(zerolog.ConsoleTestWriter(t)))))
 			},
-			manipulator: func(t *testing.T, e *executor) {
+			manipulator: func(t *testing.T, e *dynamicExecutor) {
 				{
 					oldUpScaleInterval := upScaleInterval
 					t.Cleanup(func() {
@@ -354,7 +354,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 					}
 				}()
 			},
-			executorValidator: func(t *testing.T, e *executor) bool {
+			executorValidator: func(t *testing.T, e *dynamicExecutor) bool {
 				time.Sleep(500 * time.Millisecond)
 				assert.False(t, e.running)
 				assert.False(t, e.shutdown)
@@ -367,12 +367,12 @@ func TestNewDynamicExecutor(t *testing.T) {
 			if tt.setup != nil {
 				tt.setup(t, &tt.args)
 			}
-			fixedSizeExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
-			defer fixedSizeExecutor.Stop()
+			dynamicSizedExecutor := NewDynamicExecutor(tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options...)
+			defer dynamicSizedExecutor.Stop()
 			if tt.manipulator != nil {
-				tt.manipulator(t, fixedSizeExecutor.(*executor))
+				tt.manipulator(t, dynamicSizedExecutor.(*dynamicExecutor))
 			}
-			assert.True(t, tt.executorValidator(t, fixedSizeExecutor.(*executor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
+			assert.True(t, tt.executorValidator(t, dynamicSizedExecutor.(*dynamicExecutor)), "NewFixedSizeExecutor(%v, %v, %v)", tt.args.numberOfWorkers, tt.args.queueDepth, tt.args.options)
 		})
 	}
 }