You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/03 08:41:06 UTC

[plc4x] branch develop updated (e873ba0e7f -> 7d745dae3c)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from e873ba0e7f chore(build): first failure reporter should not fail
     new 60f560662d test(plc4go/cbus): small optimization regarding required errors
     new 7d745dae3c fix(plc4go/spi): make shutdown of WorkerPool more reliable

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Browser_test.go    |  23 ++---
 plc4go/internal/cbus/Discoverer_test.go |  16 +---
 plc4go/spi/pool/WorkerPool.go           | 138 ++++++++++++++++++++++++------
 plc4go/spi/pool/WorkerPool_test.go      | 147 +++++++++++++++++++++++++++++++-
 4 files changed, 268 insertions(+), 56 deletions(-)


[plc4x] 01/02: test(plc4go/cbus): small optimization regarding required errors

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 60f560662d080876b974525f251ff4d28f23ecdc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:39:25 2023 +0200

    test(plc4go/cbus): small optimization regarding required errors
---
 plc4go/internal/cbus/Browser_test.go    | 23 ++++++-----------------
 plc4go/internal/cbus/Discoverer_test.go | 16 ++++------------
 2 files changed, 10 insertions(+), 29 deletions(-)

diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 8fc96fc76f..f89593b75f 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,7 +22,6 @@ package cbus
 import (
 	"context"
 	"fmt"
-	"github.com/apache/plc4x/plc4go/spi/utils"
 	"net/url"
 	"sync/atomic"
 	"testing"
@@ -38,11 +37,13 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transports"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	spiValues "github.com/apache/plc4x/plc4go/spi/values"
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 )
 
 func TestNewBrowser(t *testing.T) {
@@ -99,10 +100,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, transportInstance.Close())
 				})
@@ -162,10 +160,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				driver := NewDriver(loggerOption)
 				connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
 				if err := connectionConnectResult.GetErr(); err != nil {
@@ -380,10 +375,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 				transport := test.NewTransport(loggerOption)
 				transportUrl := url.URL{Scheme: "test"}
 				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				t.Cleanup(func() {
 					assert.NoError(t, transportInstance.Close())
 				})
@@ -438,10 +430,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 					}
 				})
 				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				connectionConnectResult := <-NewDriver(loggerOption).GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
 				if err := connectionConnectResult.GetErr(); err != nil {
 					t.Error(err)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 5418525b15..fb5274ee2c 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -40,6 +40,7 @@ import (
 
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
 	"golang.org/x/net/nettest"
 )
 
@@ -178,10 +179,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 			},
 			setup: func(t *testing.T, fields *fields, args *args) {
 				listen, err := net.Listen("tcp", "127.0.0.1:0")
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				go func() {
 					conn, err := listen.Accept()
 					if err != nil {
@@ -210,15 +208,9 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 				loggerOption := options.WithCustomLogger(logger)
 				transport := tcp.NewTransport(loggerOption)
 				parse, err := url.Parse("tcp://" + listen.Addr().String())
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				instance, err := transport.CreateTransportInstance(*parse, nil, loggerOption)
-				if err != nil {
-					t.Error(err)
-					t.FailNow()
-				}
+				require.NoError(t, err)
 				args.tcpTransportInstance = instance.(*tcp.TransportInstance)
 			},
 		},


[plc4x] 02/02: fix(plc4go/spi): make shutdown of WorkerPool more reliable

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 7d745dae3c663ab5c7b4a26a4a444c78fc8433ed
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:40:58 2023 +0200

    fix(plc4go/spi): make shutdown of WorkerPool more reliable
---
 plc4go/spi/pool/WorkerPool.go      | 138 +++++++++++++++++++++++++++-------
 plc4go/spi/pool/WorkerPool_test.go | 147 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 258 insertions(+), 27 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c9c031efa1..bcf143bef8 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"io"
@@ -42,6 +43,7 @@ type worker struct {
 	executor    interface {
 		isTraceWorkers() bool
 		getWorksItems() chan workItem
+		getWorkerWaitGroup() *sync.WaitGroup
 	}
 	hasEnded     atomic.Bool
 	lastReceived time.Time
@@ -58,6 +60,8 @@ func (w *worker) initialize() {
 }
 
 func (w *worker) work() {
+	w.executor.getWorkerWaitGroup().Add(1)
+	defer w.executor.getWorkerWaitGroup().Done()
 	defer func() {
 		if recovered := recover(); recovered != nil {
 			w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
@@ -104,8 +108,8 @@ type workItem struct {
 	completionFuture *future
 }
 
-func (w *workItem) String() string {
-	return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
+func (w workItem) String() string {
+	return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
 }
 
 type Executor interface {
@@ -117,13 +121,15 @@ type Executor interface {
 }
 
 type executor struct {
-	maxNumberOfWorkers int
-	running            bool
-	shutdown           bool
-	stateChange        sync.Mutex
-	worker             []*worker
-	workItems          chan workItem
-	traceWorkers       bool
+	running      bool
+	shutdown     bool
+	stateChange  sync.Mutex
+	worker       []*worker
+	queueDepth   int
+	workItems    chan workItem
+	traceWorkers bool
+
+	workerWaitGroup sync.WaitGroup
 
 	log zerolog.Logger
 }
@@ -136,9 +142,19 @@ func (e *executor) getWorksItems() chan workItem {
 	return e.workItems
 }
 
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+	return &e.workerWaitGroup
+}
+
 type dynamicExecutor struct {
-	executor
-	maxNumberOfWorkers int
+	*executor
+
+	maxNumberOfWorkers     int
+	currentNumberOfWorkers atomic.Int32
+	dynamicStateChange     sync.Mutex
+	interrupter            chan struct{}
+
+	dynamicWorkers sync.WaitGroup
 }
 
 func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
@@ -151,10 +167,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 		}
 	}
 	_executor := &executor{
-		maxNumberOfWorkers: numberOfWorkers,
-		workItems:          make(chan workItem, queueDepth),
-		worker:             workers,
-		log:                customLogger,
+		queueDepth: queueDepth,
+		workItems:  make(chan workItem, queueDepth),
+		worker:     workers,
+		log:        customLogger,
 	}
 	for _, option := range _options {
 		switch option := option.(type) {
@@ -175,11 +191,10 @@ var timeToBecomeUnused = 5 * time.Second
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLogger(_options...)
 	_executor := &dynamicExecutor{
-		executor: executor{
-			maxNumberOfWorkers: maxNumberOfWorkers,
-			workItems:          make(chan workItem, queueDepth),
-			worker:             make([]*worker, 0),
-			log:                customLogger,
+		executor: &executor{
+			workItems: make(chan workItem, queueDepth),
+			worker:    make([]*worker, 0),
+			log:       customLogger,
 		},
 		maxNumberOfWorkers: maxNumberOfWorkers,
 	}
@@ -240,6 +255,7 @@ func (e *executor) Start() {
 	e.stateChange.Lock()
 	defer e.stateChange.Unlock()
 	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
 		return
 	}
 	e.running = true
@@ -252,10 +268,25 @@ func (e *executor) Start() {
 }
 
 func (e *dynamicExecutor) Start() {
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if e.running || e.shutdown {
+		e.log.Warn().Msg("Already started")
+		return
+	}
+	if e.interrupter != nil {
+		e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+		close(e.interrupter)
+		e.dynamicWorkers.Wait()
+	}
+
 	e.executor.Start()
 	mutex := sync.Mutex{}
+	e.interrupter = make(chan struct{})
 	// Worker spawner
 	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				e.log.Error().Msgf("panic-ed %v", err)
@@ -266,13 +297,13 @@ func (e *dynamicExecutor) Start() {
 			workerLog = zerolog.Nop()
 		}
 		for e.running && !e.shutdown {
-			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
-			time.Sleep(upScaleInterval)
+			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			numberOfItemsInQueue := len(e.workItems)
 			numberOfWorkers := len(e.worker)
-			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+			workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
 			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+				workerLog.Trace().Msg("spawning new worker")
 				_worker := &worker{
 					id:           numberOfWorkers - 1,
 					interrupter:  make(chan struct{}, 1),
@@ -284,14 +315,28 @@ func (e *dynamicExecutor) Start() {
 				_worker.initialize()
 				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
 				go _worker.work()
+				e.currentNumberOfWorkers.Add(1)
 			} else {
 				workerLog.Trace().Msg("Nothing to scale")
 			}
 			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+				timer := time.NewTimer(upScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
 		}
+		workerLog.Info().Msg("Terminated")
 	}()
 	// Worker killer
 	go func() {
+		e.dynamicWorkers.Add(1)
+		defer e.dynamicWorkers.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				e.log.Error().Msgf("panic-ed %v", err)
@@ -302,8 +347,7 @@ func (e *dynamicExecutor) Start() {
 			workerLog = zerolog.Nop()
 		}
 		for e.running && !e.shutdown {
-			workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
-			time.Sleep(downScaleInterval)
+			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			newWorkers := make([]*worker, 0)
 			for _, _worker := range e.worker {
@@ -313,6 +357,7 @@ func (e *dynamicExecutor) Start() {
 					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
 					_worker.interrupted.Store(true)
 					close(_worker.interrupter)
+					e.currentNumberOfWorkers.Add(-1)
 				} else {
 					workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
 					newWorkers = append(newWorkers, _worker)
@@ -320,14 +365,27 @@ func (e *dynamicExecutor) Start() {
 			}
 			e.worker = newWorkers
 			mutex.Unlock()
+			func() {
+				workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+				timer := time.NewTimer(downScaleInterval)
+				defer utils.CleanupTimer(timer)
+				select {
+				case <-timer.C:
+				case <-e.interrupter:
+					workerLog.Info().Msg("interrupted")
+				}
+			}()
 		}
+		workerLog.Info().Msg("Terminated")
 	}()
 }
 
 func (e *executor) Stop() {
+	e.log.Trace().Msg("stopping now")
 	e.stateChange.Lock()
 	defer e.stateChange.Unlock()
 	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
 		return
 	}
 	e.shutdown = true
@@ -337,9 +395,27 @@ func (e *executor) Stop() {
 		worker.interrupted.Store(true)
 		close(worker.interrupter)
 	}
-	close(e.workItems)
 	e.running = false
 	e.shutdown = false
+	e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+	e.workerWaitGroup.Wait()
+	e.log.Trace().Msg("stopped")
+}
+
+func (e *dynamicExecutor) Stop() {
+	e.log.Trace().Msg("stopping now")
+	e.dynamicStateChange.Lock()
+	defer e.dynamicStateChange.Unlock()
+	if !e.running || e.shutdown {
+		e.log.Warn().Msg("already stopped")
+		return
+	}
+	close(e.interrupter)
+	e.log.Trace().Msg("stopping inner executor")
+	e.executor.Stop()
+	e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+	e.dynamicWorkers.Wait()
+	e.log.Trace().Msg("stopped")
 }
 
 func (e *executor) Close() error {
@@ -395,3 +471,13 @@ func (f *future) AwaitCompletion(ctx context.Context) error {
 	}
 	return nil
 }
+
+func (f *future) String() string {
+	return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
+		f.cancelRequested.Load(),
+		f.interruptRequested.Load(),
+		f.completed.Load(),
+		f.errored.Load(),
+		f.err.Load(),
+	)
+}
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index b084e61d56..9a62a09d25 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -420,7 +420,7 @@ func TestWorkItem_String(t *testing.T) {
 	}{
 		{
 			name: "Simple test",
-			want: "Workitem{wid:0}",
+			want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
 		},
 	}
 	for _, tt := range tests {
@@ -743,6 +743,151 @@ func Test_future_complete(t *testing.T) {
 	}
 }
 
+func Test_dynamicExecutor_Start(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+	}
+	tests := []struct {
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		startTwice bool
+	}{
+		{
+			name: "just start",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "start twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			startTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+			}
+			e.Start()
+			if tt.startTwice {
+				e.Start()
+			}
+			// Let it work a bit
+			time.Sleep(20 * time.Millisecond)
+			t.Log("done with test")
+			t.Cleanup(e.Stop)
+		})
+	}
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+	type fields struct {
+		executor           *executor
+		maxNumberOfWorkers int
+		interrupter        chan struct{}
+	}
+	tests := []struct {
+		name      string
+		fields    fields
+		setup     func(t *testing.T, fields *fields)
+		startIt   bool
+		stopTwice bool
+	}{
+		{
+			name: "just stop",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop started",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+		},
+		{
+			name: "stop twice",
+			fields: fields{
+				executor: &executor{
+					workItems:    make(chan workItem, 1),
+					worker:       make([]*worker, 0),
+					traceWorkers: true,
+				},
+				maxNumberOfWorkers: 100,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.executor.log = produceTestLogger(t)
+				fields.executor.workItems <- workItem{1, func() {}, &future{}}
+			},
+			stopTwice: true,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			e := &dynamicExecutor{
+				executor:           tt.fields.executor,
+				maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+				interrupter:        tt.fields.interrupter,
+			}
+			if tt.startIt {
+				e.Start()
+			}
+			e.Stop()
+			if tt.stopTwice {
+				e.Stop()
+			}
+		})
+	}
+}
+
 // from: https://github.com/golang/go/issues/36532#issuecomment-575535452
 func testContext(t *testing.T) context.Context {
 	ctx, cancel := context.WithCancel(context.Background())