You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/28 14:52:44 UTC
[plc4x] branch develop updated: refactor(plc4go/spi): slight cleanup of pool
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 3ea774cd4f refactor(plc4go/spi): slight cleanup of pool
3ea774cd4f is described below
commit 3ea774cd4fa8a87bbd177c70401123157a1cc396
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 28 16:52:32 2023 +0200
refactor(plc4go/spi): slight cleanup of pool
---
plc4go/spi/pool/WorkerPool.go | 16 +----
plc4go/spi/pool/dynamicExecutor.go | 7 +++
plc4go/spi/pool/dynamicExecutor_test.go | 45 ++++++++++++--
plc4go/spi/pool/executor.go | 9 ++-
plc4go/spi/pool/executor_plc4xgen.go | 4 --
plc4go/spi/pool/executor_test.go | 72 ++++++++++++++--------
.../transactions/RequestTransactionManager_test.go | 6 +-
7 files changed, 107 insertions(+), 52 deletions(-)
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index d19da618b5..b5d781fb82 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -54,12 +54,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
w.lastReceived.Store(time.Time{})
workers[i] = &w
}
- _executor := &executor{
- queueDepth: queueDepth,
- workItems: make(chan workItem, queueDepth),
- worker: workers,
- log: customLogger,
- }
+ _executor := newExecutor(queueDepth, workers, customLogger)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = _executor
@@ -69,14 +64,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
- _executor := &dynamicExecutor{
- executor: &executor{
- workItems: make(chan workItem, queueDepth),
- worker: make([]*worker, 0),
- log: customLogger,
- },
- maxNumberOfWorkers: maxNumberOfWorkers,
- }
+ _executor := newDynamicExecutor(queueDepth, maxNumberOfWorkers, customLogger)
_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
// We spawn one initial worker
w := worker{
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index e2cf3afb56..630cabee08 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -44,6 +44,13 @@ type dynamicExecutor struct {
dynamicWorkers sync.WaitGroup
}
+func newDynamicExecutor(queueDepth, maxNumberOfWorkers int, log zerolog.Logger) *dynamicExecutor {
+ return &dynamicExecutor{
+ executor: newExecutor(queueDepth, make([]*worker, 0), log),
+ maxNumberOfWorkers: maxNumberOfWorkers,
+ }
+}
+
func (e *dynamicExecutor) Start() {
e.dynamicStateChange.Lock()
defer e.dynamicStateChange.Unlock()
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index e58ffd0f13..482c9e729d 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,12 +20,49 @@
package pool
import (
- "github.com/stretchr/testify/assert"
"sync/atomic"
"testing"
"time"
+
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
)
+func Test_newDynamicExecutor(t *testing.T) {
+ type args struct {
+ queueDepth int
+ maxNumberOfWorkers int
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ args args
+ want *dynamicExecutor
+ manipulator func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor)
+ }{
+ {
+ name: "just create it",
+ want: &dynamicExecutor{
+ executor: newExecutor(0, make([]*worker, 0), zerolog.Logger{}),
+ },
+ manipulator: func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor) {
+ assert.NotNil(t, got.workItems)
+ want.workItems = got.workItems
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := newDynamicExecutor(tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
+ want := tt.want
+ if tt.manipulator != nil {
+ tt.manipulator(t, want, got)
+ }
+ assert.Equalf(t, want, got, "newDynamicExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
+ })
+ }
+}
+
func Test_dynamicExecutor_Start(t *testing.T) {
type fields struct {
executor *executor
@@ -208,9 +245,9 @@ func Test_dynamicExecutor_String(t *testing.T) {
║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║║
║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║║
-║║║0x0000000000000000 0║║0 element(s)║║ b0 false ║ ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║║
+║║╔═workItems══╗╔═traceWorkers╗ ║║
+║║║0 element(s)║║ b0 false ║ ║║
+║║╚════════════╝╚═════════════╝ ║║
║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗ ║
║║0x0000000000000003 3║║ 0x00000000 0 ║║0 element(s)║ ║
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 403408f9ca..e3c83917a2 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -34,7 +34,6 @@ type executor struct {
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
@@ -44,6 +43,14 @@ type executor struct {
log zerolog.Logger `ignore:"true"`
}
+func newExecutor(queueDepth int, workers []*worker, log zerolog.Logger) *executor {
+ return &executor{
+ workItems: make(chan workItem, queueDepth),
+ worker: workers,
+ log: log,
+ }
+}
+
func (e *executor) isTraceWorkers() bool {
return e.traceWorkers
}
diff --git a/plc4go/spi/pool/executor_plc4xgen.go b/plc4go/spi/pool/executor_plc4xgen.go
index 7278d348a0..8a914e3ac6 100644
--- a/plc4go/spi/pool/executor_plc4xgen.go
+++ b/plc4go/spi/pool/executor_plc4xgen.go
@@ -79,10 +79,6 @@ func (d *executor) SerializeWithWriteBuffer(ctx context.Context, writeBuffer uti
return err
}
- if err := writeBuffer.WriteInt64("queueDepth", 64, int64(d.queueDepth)); err != nil {
- return err
- }
-
_workItems_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.workItems))
if err := writeBuffer.WriteString("workItems", uint32(len(_workItems_plx4gen_description)*8), "UTF-8", _workItems_plx4gen_description); err != nil {
return err
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 15ecac74ca..3f575ca75f 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -31,12 +31,44 @@ import (
"github.com/stretchr/testify/assert"
)
+func Test_newExecutor(t *testing.T) {
+ type args struct {
+ queueDepth int
+ workers []*worker
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ args args
+ want *executor
+ manipulator func(t *testing.T, want *executor, got *executor)
+ }{
+ {
+ name: "just create it",
+ want: &executor{},
+ manipulator: func(t *testing.T, want *executor, got *executor) {
+ assert.NotNil(t, got.workItems)
+ want.workItems = got.workItems
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got := newExecutor(tt.args.queueDepth, tt.args.workers, tt.args.log)
+ want := tt.want
+ if tt.manipulator != nil {
+ tt.manipulator(t, want, got)
+ }
+ assert.Equalf(t, want, got, "newExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.workers, tt.args.log)
+ })
+ }
+}
+
func Test_executor_Close(t *testing.T) {
type fields struct {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
}
@@ -56,7 +88,6 @@ func Test_executor_Close(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
@@ -71,7 +102,6 @@ func Test_executor_IsRunning(t *testing.T) {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
}
@@ -90,7 +120,6 @@ func Test_executor_IsRunning(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
@@ -311,7 +340,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
}
@@ -331,7 +359,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
@@ -346,7 +373,6 @@ func Test_executor_getWorksItems(t *testing.T) {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
log zerolog.Logger
@@ -366,7 +392,6 @@ func Test_executor_getWorksItems(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: tt.fields.log,
@@ -381,7 +406,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
}
@@ -400,7 +424,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
@@ -415,7 +438,6 @@ func Test_executor_String(t *testing.T) {
running bool
shutdown bool
worker []*worker
- queueDepth int
workItems chan workItem
traceWorkers bool
}
@@ -441,23 +463,22 @@ func Test_executor_String(t *testing.T) {
}(),
},
},
- queueDepth: 2,
traceWorkers: true,
},
want: `
-╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗ ║
-║║b1 true ║║ b1 true ║ ║
-║╚════════╝╚═════════╝ ║
-║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
-║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║
-║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
-║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║
-║║0x0000000000000002 2║║0 element(s)║║ b1 true ║ ║
-║╚════════════════════╝╚════════════╝╚═════════════╝ ║
-╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗ ║
+║║b1 true ║║ b1 true ║ ║
+║╚════════╝╚═════════╝ ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗╔═workItems══╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║0 element(s)║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║╚════════════╝║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║ ║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
+║╔═traceWorkers╗ ║
+║║ b1 true ║ ║
+║╚═════════════╝ ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
},
}
for _, tt := range tests {
@@ -466,7 +487,6 @@ func Test_executor_String(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queueDepth: tt.fields.queueDepth,
workItems: tt.fields.workItems,
traceWorkers: tt.fields.traceWorkers,
log: produceTestingLogger(t),
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index acf3bfae09..276bf8178d 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -654,9 +654,9 @@ func Test_requestTransactionManager_String(t *testing.T) {
║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║ b0 false ║║0 element(s)║║║ ║
║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║ ║
║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║ ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗ ║ ║
-║║║0x0000000000000001 1║║0 element(s)║║ b0 false ║ ║ ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝ ║ ║
+║║╔═workItems══╗╔═traceWorkers╗ ║ ║
+║║║0 element(s)║║ b0 false ║ ║ ║
+║║╚════════════╝╚═════════════╝ ║ ║
║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝ ║
║╔═traceTransactionManagerTransactions╗ ║
║║ b1 true ║ ║