You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/28 14:52:44 UTC

[plc4x] branch develop updated: refactor(plc4go/spi): slight cleanup of pool

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3ea774cd4f refactor(plc4go/spi): slight cleanup of pool
3ea774cd4f is described below

commit 3ea774cd4fa8a87bbd177c70401123157a1cc396
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 28 16:52:32 2023 +0200

    refactor(plc4go/spi): slight cleanup of pool
---
 plc4go/spi/pool/WorkerPool.go                      | 16 +----
 plc4go/spi/pool/dynamicExecutor.go                 |  7 +++
 plc4go/spi/pool/dynamicExecutor_test.go            | 45 ++++++++++++--
 plc4go/spi/pool/executor.go                        |  9 ++-
 plc4go/spi/pool/executor_plc4xgen.go               |  4 --
 plc4go/spi/pool/executor_test.go                   | 72 ++++++++++++++--------
 .../transactions/RequestTransactionManager_test.go |  6 +-
 7 files changed, 107 insertions(+), 52 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index d19da618b5..b5d781fb82 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -54,12 +54,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 		w.lastReceived.Store(time.Time{})
 		workers[i] = &w
 	}
-	_executor := &executor{
-		queueDepth: queueDepth,
-		workItems:  make(chan workItem, queueDepth),
-		worker:     workers,
-		log:        customLogger,
-	}
+	_executor := newExecutor(queueDepth, workers, customLogger)
 	_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
 	for i := 0; i < numberOfWorkers; i++ {
 		workers[i].executor = _executor
@@ -69,14 +64,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 
 func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	customLogger := options.ExtractCustomLoggerOrDefaultToGlobal(_options...)
-	_executor := &dynamicExecutor{
-		executor: &executor{
-			workItems: make(chan workItem, queueDepth),
-			worker:    make([]*worker, 0),
-			log:       customLogger,
-		},
-		maxNumberOfWorkers: maxNumberOfWorkers,
-	}
+	_executor := newDynamicExecutor(queueDepth, maxNumberOfWorkers, customLogger)
 	_executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
 	// We spawn one initial worker
 	w := worker{
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index e2cf3afb56..630cabee08 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -44,6 +44,13 @@ type dynamicExecutor struct {
 	dynamicWorkers sync.WaitGroup
 }
 
+func newDynamicExecutor(queueDepth, maxNumberOfWorkers int, log zerolog.Logger) *dynamicExecutor {
+	return &dynamicExecutor{
+		executor:           newExecutor(queueDepth, make([]*worker, 0), log),
+		maxNumberOfWorkers: maxNumberOfWorkers,
+	}
+}
+
 func (e *dynamicExecutor) Start() {
 	e.dynamicStateChange.Lock()
 	defer e.dynamicStateChange.Unlock()
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index e58ffd0f13..482c9e729d 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,12 +20,49 @@
 package pool
 
 import (
-	"github.com/stretchr/testify/assert"
 	"sync/atomic"
 	"testing"
 	"time"
+
+	"github.com/rs/zerolog"
+	"github.com/stretchr/testify/assert"
 )
 
+func Test_newDynamicExecutor(t *testing.T) {
+	type args struct {
+		queueDepth         int
+		maxNumberOfWorkers int
+		log                zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		args        args
+		want        *dynamicExecutor
+		manipulator func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor)
+	}{
+		{
+			name: "just create it",
+			want: &dynamicExecutor{
+				executor: newExecutor(0, make([]*worker, 0), zerolog.Logger{}),
+			},
+			manipulator: func(t *testing.T, want *dynamicExecutor, got *dynamicExecutor) {
+				assert.NotNil(t, got.workItems)
+				want.workItems = got.workItems
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got := newDynamicExecutor(tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
+			want := tt.want
+			if tt.manipulator != nil {
+				tt.manipulator(t, want, got)
+			}
+			assert.Equalf(t, want, got, "newDynamicExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.maxNumberOfWorkers, tt.args.log)
+		})
+	}
+}
+
 func Test_dynamicExecutor_Start(t *testing.T) {
 	type fields struct {
 		executor           *executor
@@ -208,9 +245,9 @@ func Test_dynamicExecutor_String(t *testing.T) {
 ║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║║
 ║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║║
 ║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║║
-║║║0x0000000000000000 0║║0 element(s)║║  b0 false   ║                                                     ║║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║║
+║║╔═workItems══╗╔═traceWorkers╗                                                                           ║║
+║║║0 element(s)║║  b0 false   ║                                                                           ║║
+║║╚════════════╝╚═════════════╝                                                                           ║║
 ║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝║
 ║╔═maxNumberOfWorkers═╗╔═currentNumberOfWorkers╗╔═interrupter╗                                             ║
 ║║0x0000000000000003 3║║     0x00000000 0      ║║0 element(s)║                                             ║
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 403408f9ca..e3c83917a2 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -34,7 +34,6 @@ type executor struct {
 	shutdown bool
 
 	worker       []*worker
-	queueDepth   int
 	workItems    chan workItem
 	traceWorkers bool
 
@@ -44,6 +43,14 @@ type executor struct {
 	log zerolog.Logger `ignore:"true"`
 }
 
+func newExecutor(queueDepth int, workers []*worker, log zerolog.Logger) *executor {
+	return &executor{
+		workItems: make(chan workItem, queueDepth),
+		worker:    workers,
+		log:       log,
+	}
+}
+
 func (e *executor) isTraceWorkers() bool {
 	return e.traceWorkers
 }
diff --git a/plc4go/spi/pool/executor_plc4xgen.go b/plc4go/spi/pool/executor_plc4xgen.go
index 7278d348a0..8a914e3ac6 100644
--- a/plc4go/spi/pool/executor_plc4xgen.go
+++ b/plc4go/spi/pool/executor_plc4xgen.go
@@ -79,10 +79,6 @@ func (d *executor) SerializeWithWriteBuffer(ctx context.Context, writeBuffer uti
 		return err
 	}
 
-	if err := writeBuffer.WriteInt64("queueDepth", 64, int64(d.queueDepth)); err != nil {
-		return err
-	}
-
 	_workItems_plx4gen_description := fmt.Sprintf("%d element(s)", len(d.workItems))
 	if err := writeBuffer.WriteString("workItems", uint32(len(_workItems_plx4gen_description)*8), "UTF-8", _workItems_plx4gen_description); err != nil {
 		return err
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 15ecac74ca..3f575ca75f 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -31,12 +31,44 @@ import (
 	"github.com/stretchr/testify/assert"
 )
 
+func Test_newExecutor(t *testing.T) {
+	type args struct {
+		queueDepth int
+		workers    []*worker
+		log        zerolog.Logger
+	}
+	tests := []struct {
+		name        string
+		args        args
+		want        *executor
+		manipulator func(t *testing.T, want *executor, got *executor)
+	}{
+		{
+			name: "just create it",
+			want: &executor{},
+			manipulator: func(t *testing.T, want *executor, got *executor) {
+				assert.NotNil(t, got.workItems)
+				want.workItems = got.workItems
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got := newExecutor(tt.args.queueDepth, tt.args.workers, tt.args.log)
+			want := tt.want
+			if tt.manipulator != nil {
+				tt.manipulator(t, want, got)
+			}
+			assert.Equalf(t, want, got, "newExecutor(%v, %v, %v)", tt.args.queueDepth, tt.args.workers, tt.args.log)
+		})
+	}
+}
+
 func Test_executor_Close(t *testing.T) {
 	type fields struct {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 	}
@@ -56,7 +88,6 @@ func Test_executor_Close(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          produceTestingLogger(t),
@@ -71,7 +102,6 @@ func Test_executor_IsRunning(t *testing.T) {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 	}
@@ -90,7 +120,6 @@ func Test_executor_IsRunning(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          produceTestingLogger(t),
@@ -311,7 +340,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 	}
@@ -331,7 +359,6 @@ func Test_executor_getWorkerWaitGroup(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          produceTestingLogger(t),
@@ -346,7 +373,6 @@ func Test_executor_getWorksItems(t *testing.T) {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 		log          zerolog.Logger
@@ -366,7 +392,6 @@ func Test_executor_getWorksItems(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          tt.fields.log,
@@ -381,7 +406,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 	}
@@ -400,7 +424,6 @@ func Test_executor_isTraceWorkers(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          produceTestingLogger(t),
@@ -415,7 +438,6 @@ func Test_executor_String(t *testing.T) {
 		running      bool
 		shutdown     bool
 		worker       []*worker
-		queueDepth   int
 		workItems    chan workItem
 		traceWorkers bool
 	}
@@ -441,23 +463,22 @@ func Test_executor_String(t *testing.T) {
 						}(),
 					},
 				},
-				queueDepth:   2,
 				traceWorkers: true,
 			},
 			want: `
-╔═executor═══════════════════════════════════════════════════════════════════════════════════════════════╗
-║╔═running╗╔═shutdown╗                                                                                   ║
-║║b1 true ║║ b1 true ║                                                                                   ║
-║╚════════╝╚═════════╝                                                                                   ║
-║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗║
-║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║
-║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║
-║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║
-║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║
-║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║
-║║0x0000000000000002 2║║0 element(s)║║   b1 true   ║                                                     ║
-║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║
-╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
+╔═executor═════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
+║╔═running╗╔═shutdown╗                                                                                                 ║
+║║b1 true ║║ b1 true ║                                                                                                 ║
+║╚════════╝╚═════════╝                                                                                                 ║
+║╔═worker/value/worker══════════════════════════════════════════════════════════════════════════════════╗╔═workItems══╗║
+║║╔═id═════════════════╗╔═lastReceived════════════════╗╔═running╗╔═shutdown╗╔═interrupted╗╔═interrupter╗║║0 element(s)║║
+║║║0x0000000000000001 1║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║╚════════════╝║
+║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║              ║
+║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝              ║
+║╔═traceWorkers╗                                                                                                       ║
+║║   b1 true   ║                                                                                                       ║
+║╚═════════════╝                                                                                                       ║
+╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝`[1:],
 		},
 	}
 	for _, tt := range tests {
@@ -466,7 +487,6 @@ func Test_executor_String(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queueDepth:   tt.fields.queueDepth,
 				workItems:    tt.fields.workItems,
 				traceWorkers: tt.fields.traceWorkers,
 				log:          produceTestingLogger(t),
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index acf3bfae09..276bf8178d 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -654,9 +654,9 @@ func Test_requestTransactionManager_String(t *testing.T) {
 ║║║║0x0000000000000000 0║║0001-01-01 00:00:00 +0000 UTC║║b0 false║║b0 false ║║  b0 false  ║║0 element(s)║║║           ║
 ║║║╚════════════════════╝╚═════════════════════════════╝╚════════╝╚═════════╝╚════════════╝╚════════════╝║║           ║
 ║║╚══════════════════════════════════════════════════════════════════════════════════════════════════════╝║           ║
-║║╔═queueDepth═════════╗╔═workItems══╗╔═traceWorkers╗                                                     ║           ║
-║║║0x0000000000000001 1║║0 element(s)║║  b0 false   ║                                                     ║           ║
-║║╚════════════════════╝╚════════════╝╚═════════════╝                                                     ║           ║
+║║╔═workItems══╗╔═traceWorkers╗                                                                           ║           ║
+║║║0 element(s)║║  b0 false   ║                                                                           ║           ║
+║║╚════════════╝╚═════════════╝                                                                           ║           ║
 ║╚════════════════════════════════════════════════════════════════════════════════════════════════════════╝           ║
 ║╔═traceTransactionManagerTransactions╗                                                                               ║
 ║║              b1 true               ║                                                                               ║