You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/03 08:41:06 UTC
[plc4x] branch develop updated (e873ba0e7f -> 7d745dae3c)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
from e873ba0e7f chore(build): first failure reporter should not fail
new 60f560662d test(plc4go/cbus): small optimization regarding required errors
new 7d745dae3c fix(plc4go/spi): make shutdown of WorkerPool more reliable
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
plc4go/internal/cbus/Browser_test.go | 23 ++---
plc4go/internal/cbus/Discoverer_test.go | 16 +---
plc4go/spi/pool/WorkerPool.go | 138 ++++++++++++++++++++++++------
plc4go/spi/pool/WorkerPool_test.go | 147 +++++++++++++++++++++++++++++++-
4 files changed, 268 insertions(+), 56 deletions(-)
[plc4x] 01/02: test(plc4go/cbus): small optimization regarding required errors
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 60f560662d080876b974525f251ff4d28f23ecdc
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:39:25 2023 +0200
test(plc4go/cbus): small optimization regarding required errors
---
plc4go/internal/cbus/Browser_test.go | 23 ++++++-----------------
plc4go/internal/cbus/Discoverer_test.go | 16 ++++------------
2 files changed, 10 insertions(+), 29 deletions(-)
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 8fc96fc76f..f89593b75f 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,7 +22,6 @@ package cbus
import (
"context"
"fmt"
- "github.com/apache/plc4x/plc4go/spi/utils"
"net/url"
"sync/atomic"
"testing"
@@ -38,11 +37,13 @@ import (
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/apache/plc4x/plc4go/spi/utils"
spiValues "github.com/apache/plc4x/plc4go/spi/values"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
)
func TestNewBrowser(t *testing.T) {
@@ -99,10 +100,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, transportInstance.Close())
})
@@ -162,10 +160,7 @@ func TestBrowser_BrowseQuery(t *testing.T) {
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
driver := NewDriver(loggerOption)
connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
if err := connectionConnectResult.GetErr(); err != nil {
@@ -380,10 +375,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
transport := test.NewTransport(loggerOption)
transportUrl := url.URL{Scheme: "test"}
transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
t.Cleanup(func() {
assert.NoError(t, transportInstance.Close())
})
@@ -438,10 +430,7 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
}
})
err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
connectionConnectResult := <-NewDriver(loggerOption).GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
if err := connectionConnectResult.GetErr(); err != nil {
t.Error(err)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 5418525b15..fb5274ee2c 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -40,6 +40,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"golang.org/x/net/nettest"
)
@@ -178,10 +179,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
},
setup: func(t *testing.T, fields *fields, args *args) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
go func() {
conn, err := listen.Accept()
if err != nil {
@@ -210,15 +208,9 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
loggerOption := options.WithCustomLogger(logger)
transport := tcp.NewTransport(loggerOption)
parse, err := url.Parse("tcp://" + listen.Addr().String())
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
instance, err := transport.CreateTransportInstance(*parse, nil, loggerOption)
- if err != nil {
- t.Error(err)
- t.FailNow()
- }
+ require.NoError(t, err)
args.tcpTransportInstance = instance.(*tcp.TransportInstance)
},
},
[plc4x] 02/02: fix(plc4go/spi): make shutdown of WorkerPool more reliable
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 7d745dae3c663ab5c7b4a26a4a444c78fc8433ed
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Sat Jun 3 10:40:58 2023 +0200
fix(plc4go/spi): make shutdown of WorkerPool more reliable
---
plc4go/spi/pool/WorkerPool.go | 138 +++++++++++++++++++++++++++-------
plc4go/spi/pool/WorkerPool_test.go | 147 ++++++++++++++++++++++++++++++++++++-
2 files changed, 258 insertions(+), 27 deletions(-)
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index c9c031efa1..bcf143bef8 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/options"
+ "github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"io"
@@ -42,6 +43,7 @@ type worker struct {
executor interface {
isTraceWorkers() bool
getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
}
hasEnded atomic.Bool
lastReceived time.Time
@@ -58,6 +60,8 @@ func (w *worker) initialize() {
}
func (w *worker) work() {
+ w.executor.getWorkerWaitGroup().Add(1)
+ defer w.executor.getWorkerWaitGroup().Done()
defer func() {
if recovered := recover(); recovered != nil {
w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
@@ -104,8 +108,8 @@ type workItem struct {
completionFuture *future
}
-func (w *workItem) String() string {
- return fmt.Sprintf("Workitem{wid:%d}", w.workItemId)
+func (w workItem) String() string {
+ return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
}
type Executor interface {
@@ -117,13 +121,15 @@ type Executor interface {
}
type executor struct {
- maxNumberOfWorkers int
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*worker
- workItems chan workItem
- traceWorkers bool
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+
+ workerWaitGroup sync.WaitGroup
log zerolog.Logger
}
@@ -136,9 +142,19 @@ func (e *executor) getWorksItems() chan workItem {
return e.workItems
}
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+ return &e.workerWaitGroup
+}
+
type dynamicExecutor struct {
- executor
- maxNumberOfWorkers int
+ *executor
+
+ maxNumberOfWorkers int
+ currentNumberOfWorkers atomic.Int32
+ dynamicStateChange sync.Mutex
+ interrupter chan struct{}
+
+ dynamicWorkers sync.WaitGroup
}
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
@@ -151,10 +167,10 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
}
}
_executor := &executor{
- maxNumberOfWorkers: numberOfWorkers,
- workItems: make(chan workItem, queueDepth),
- worker: workers,
- log: customLogger,
+ queueDepth: queueDepth,
+ workItems: make(chan workItem, queueDepth),
+ worker: workers,
+ log: customLogger,
}
for _, option := range _options {
switch option := option.(type) {
@@ -175,11 +191,10 @@ var timeToBecomeUnused = 5 * time.Second
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLogger(_options...)
_executor := &dynamicExecutor{
- executor: executor{
- maxNumberOfWorkers: maxNumberOfWorkers,
- workItems: make(chan workItem, queueDepth),
- worker: make([]*worker, 0),
- log: customLogger,
+ executor: &executor{
+ workItems: make(chan workItem, queueDepth),
+ worker: make([]*worker, 0),
+ log: customLogger,
},
maxNumberOfWorkers: maxNumberOfWorkers,
}
@@ -240,6 +255,7 @@ func (e *executor) Start() {
e.stateChange.Lock()
defer e.stateChange.Unlock()
if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
return
}
e.running = true
@@ -252,10 +268,25 @@ func (e *executor) Start() {
}
func (e *dynamicExecutor) Start() {
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ if e.interrupter != nil {
+ e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+ close(e.interrupter)
+ e.dynamicWorkers.Wait()
+ }
+
e.executor.Start()
mutex := sync.Mutex{}
+ e.interrupter = make(chan struct{})
// Worker spawner
go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
defer func() {
if err := recover(); err != nil {
e.log.Error().Msgf("panic-ed %v", err)
@@ -266,13 +297,13 @@ func (e *dynamicExecutor) Start() {
workerLog = zerolog.Nop()
}
for e.running && !e.shutdown {
- workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
- time.Sleep(upScaleInterval)
+ workerLog.Trace().Msg("running")
mutex.Lock()
numberOfItemsInQueue := len(e.workItems)
numberOfWorkers := len(e.worker)
- workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+ workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+ workerLog.Trace().Msg("spawning new worker")
_worker := &worker{
id: numberOfWorkers - 1,
interrupter: make(chan struct{}, 1),
@@ -284,14 +315,28 @@ func (e *dynamicExecutor) Start() {
_worker.initialize()
workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
go _worker.work()
+ e.currentNumberOfWorkers.Add(1)
} else {
workerLog.Trace().Msg("Nothing to scale")
}
mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ timer := time.NewTimer(upScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
}
+ workerLog.Info().Msg("Terminated")
}()
// Worker killer
go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
defer func() {
if err := recover(); err != nil {
e.log.Error().Msgf("panic-ed %v", err)
@@ -302,8 +347,7 @@ func (e *dynamicExecutor) Start() {
workerLog = zerolog.Nop()
}
for e.running && !e.shutdown {
- workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
- time.Sleep(downScaleInterval)
+ workerLog.Trace().Msg("running")
mutex.Lock()
newWorkers := make([]*worker, 0)
for _, _worker := range e.worker {
@@ -313,6 +357,7 @@ func (e *dynamicExecutor) Start() {
workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
_worker.interrupted.Store(true)
close(_worker.interrupter)
+ e.currentNumberOfWorkers.Add(-1)
} else {
workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
newWorkers = append(newWorkers, _worker)
@@ -320,14 +365,27 @@ func (e *dynamicExecutor) Start() {
}
e.worker = newWorkers
mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ timer := time.NewTimer(downScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
}
+ workerLog.Info().Msg("Terminated")
}()
}
func (e *executor) Stop() {
+ e.log.Trace().Msg("stopping now")
e.stateChange.Lock()
defer e.stateChange.Unlock()
if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
return
}
e.shutdown = true
@@ -337,9 +395,27 @@ func (e *executor) Stop() {
worker.interrupted.Store(true)
close(worker.interrupter)
}
- close(e.workItems)
e.running = false
e.shutdown = false
+ e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+ e.workerWaitGroup.Wait()
+ e.log.Trace().Msg("stopped")
+}
+
+func (e *dynamicExecutor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ close(e.interrupter)
+ e.log.Trace().Msg("stopping inner executor")
+ e.executor.Stop()
+ e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+ e.dynamicWorkers.Wait()
+ e.log.Trace().Msg("stopped")
}
func (e *executor) Close() error {
@@ -395,3 +471,13 @@ func (f *future) AwaitCompletion(ctx context.Context) error {
}
return nil
}
+
+func (f *future) String() string {
+ return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
+ f.cancelRequested.Load(),
+ f.interruptRequested.Load(),
+ f.completed.Load(),
+ f.errored.Load(),
+ f.err.Load(),
+ )
+}
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index b084e61d56..9a62a09d25 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -420,7 +420,7 @@ func TestWorkItem_String(t *testing.T) {
}{
{
name: "Simple test",
- want: "Workitem{wid:0}",
+ want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
},
}
for _, tt := range tests {
@@ -743,6 +743,151 @@ func Test_future_complete(t *testing.T) {
}
}
+func Test_dynamicExecutor_Start(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startTwice bool
+ }{
+ {
+ name: "just start",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "start twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ startTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ }
+ e.Start()
+ if tt.startTwice {
+ e.Start()
+ }
+ // Let it work a bit
+ time.Sleep(20 * time.Millisecond)
+ t.Log("done with test")
+ t.Cleanup(e.Stop)
+ })
+ }
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ interrupter chan struct{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startIt bool
+ stopTwice bool
+ }{
+ {
+ name: "just stop",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ stopTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ interrupter: tt.fields.interrupter,
+ }
+ if tt.startIt {
+ e.Start()
+ }
+ e.Stop()
+ if tt.stopTwice {
+ e.Stop()
+ }
+ })
+ }
+}
+
// from: https://github.com/golang/go/issues/36532#issuecomment-575535452
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())