You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/13 12:17:21 UTC
[plc4x] branch develop updated (ff1810481f -> 5422176096)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
from ff1810481f build(deps): bump asciidoctorj-diagram from 2.2.8 to 2.2.9 (#986)
new d7d54912ee refactor(plc4go/spi): split up request transaction into separate file
new ade510700a refactor(plc4go/spi): split up pool into multiple files
new caa9718473 feat(plc4go/spi): added more Stringer implementations
new 5422176096 feat(plc4go/cbus): added more Stringer implementations
The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
plc4go/internal/cbus/Configuration.go | 5 +
plc4go/internal/cbus/Configuration_test.go | 60 ++
plc4go/internal/cbus/Connection.go | 46 +-
plc4go/internal/cbus/Connection_test.go | 11 +-
plc4go/internal/cbus/DriverContext.go | 6 +
plc4go/internal/cbus/MessageCodec.go | 21 +
plc4go/internal/cbus/MessageCodec_test.go | 49 ++
plc4go/internal/cbus/Subscriber.go | 76 +--
plc4go/internal/cbus/Subscriber_test.go | 96 ++-
plc4go/spi/default/DefaultCodec.go | 17 +
plc4go/spi/default/DefaultCodec_test.go | 43 ++
plc4go/spi/default/DefaultConnection.go | 15 +
plc4go/spi/default/DefaultConnection_test.go | 40 ++
plc4go/spi/pool/CompletionFuture.go | 90 +++
plc4go/spi/pool/CompletionFuture_test.go | 191 ++++++
plc4go/spi/pool/WorkerPool.go | 393 +-----------
plc4go/spi/pool/WorkerPool_test.go | 689 ---------------------
plc4go/spi/pool/dynamicExecutor.go | 186 ++++++
plc4go/spi/pool/dynamicExecutor_test.go | 213 +++++++
plc4go/spi/pool/executor.go | 148 +++++
plc4go/spi/pool/executor_test.go | 479 ++++++++++++++
.../model/plc_message.go => spi/pool/workItem.go} | 21 +-
.../pool/workItem_test.go} | 34 +-
plc4go/spi/pool/worker.go | 113 ++++
plc4go/spi/pool/worker_test.go | 253 ++++++++
plc4go/spi/transactions/RequestTransaction.go | 148 +++++
.../spi/transactions/RequestTransactionManager.go | 140 +----
.../transactions/RequestTransactionManager_test.go | 401 +++---------
plc4go/spi/transactions/RequestTransaction_test.go | 319 ++++++++++
.../{mock_requirements.go => completedFuture.go} | 21 +-
plc4go/spi/transactions/completedFuture_test.go | 106 ++++
31 files changed, 2812 insertions(+), 1618 deletions(-)
create mode 100644 plc4go/spi/pool/CompletionFuture.go
create mode 100644 plc4go/spi/pool/CompletionFuture_test.go
create mode 100644 plc4go/spi/pool/dynamicExecutor.go
create mode 100644 plc4go/spi/pool/dynamicExecutor_test.go
create mode 100644 plc4go/spi/pool/executor.go
create mode 100644 plc4go/spi/pool/executor_test.go
copy plc4go/{pkg/api/model/plc_message.go => spi/pool/workItem.go} (71%)
copy plc4go/{internal/cbus/DriverContext_test.go => spi/pool/workItem_test.go} (63%)
create mode 100644 plc4go/spi/pool/worker.go
create mode 100644 plc4go/spi/pool/worker_test.go
create mode 100644 plc4go/spi/transactions/RequestTransaction.go
create mode 100644 plc4go/spi/transactions/RequestTransaction_test.go
copy plc4go/spi/transactions/{mock_requirements.go => completedFuture.go} (72%)
create mode 100644 plc4go/spi/transactions/completedFuture_test.go
[plc4x] 03/04: feat(plc4go/spi): added more Stringer implementations
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit caa9718473faa3ebb4fc71951fd3cf4ead000c17
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 14:16:57 2023 +0200
feat(plc4go/spi): added more Stringer implementations
---
plc4go/spi/default/DefaultCodec.go | 17 ++++
plc4go/spi/default/DefaultCodec_test.go | 43 ++++++++
plc4go/spi/default/DefaultConnection.go | 15 +++
plc4go/spi/default/DefaultConnection_test.go | 40 ++++++++
plc4go/spi/pool/dynamicExecutor.go | 13 +++
plc4go/spi/pool/dynamicExecutor_test.go | 43 ++++++++
plc4go/spi/pool/executor.go | 19 ++++
plc4go/spi/pool/executor_test.go | 66 ++++++++++++
plc4go/spi/pool/workItem.go | 10 +-
plc4go/spi/pool/workItem_test.go | 6 +-
plc4go/spi/pool/worker.go | 17 ++++
plc4go/spi/pool/worker_test.go | 26 +++++
.../spi/transactions/RequestTransactionManager.go | 33 ++++--
.../transactions/RequestTransactionManager_test.go | 113 +++++++++++++++++----
plc4go/spi/transactions/completedFuture.go | 9 +-
plc4go/spi/transactions/completedFuture_test.go | 106 +++++++++++++++++++
16 files changed, 549 insertions(+), 27 deletions(-)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 30b70557b6..afd3052c9c 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -244,6 +244,7 @@ func (m *defaultCodec) HandleMessages(message spi.Message) bool {
messageHandled := false
m.log.Trace().Msgf("Current number of expectations: %d", len(m.expectations))
for index, expectation := range m.expectations {
+ m.log.Trace().Msgf("Checking expectation %s", expectation)
// Check if the current message matches the expectations
// If it does, let it handle the message.
if accepts := expectation.GetAcceptsMessage()(message); accepts {
@@ -354,3 +355,19 @@ func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Log
workerLog.Warn().Msgf("Message discarded\n%s", message)
}
}
+
+func (m *defaultCodec) String() string {
+ return fmt.Sprintf("DefaultCodec{\n"+
+ "\tTransportInstance: %s,\n"+
+ "\tDefaultIncomingMessageChannel: %d elements,\n"+
+ "\tExpectations: %s,\n"+
+ "\trunning: %t,\n"+
+ "\tcustomMessageHandling: %t,\n"+
+ "}",
+ m.transportInstance,
+ len(m.defaultIncomingMessageChannel),
+ m.expectations,
+ m.running,
+ m.customMessageHandling != nil,
+ )
+}
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 664b92c986..a186912b71 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
@@ -1194,3 +1195,45 @@ func Test_defaultCodec_Work(t *testing.T) {
})
}
}
+
+func Test_defaultCodec_String(t *testing.T) {
+ type fields struct {
+ DefaultCodecRequirements DefaultCodecRequirements
+ transportInstance transports.TransportInstance
+ defaultIncomingMessageChannel chan spi.Message
+ expectations []spi.Expectation
+ running bool
+ customMessageHandling func(codec DefaultCodecRequirements, message spi.Message) bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ want: "DefaultCodec{\n" +
+ "\tTransportInstance: %!s(<nil>),\n" +
+ "\tDefaultIncomingMessageChannel: 0 elements,\n" +
+ "\tExpectations: [],\n" +
+ "\trunning: false,\n" +
+ "\tcustomMessageHandling: false,\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := &defaultCodec{
+ DefaultCodecRequirements: tt.fields.DefaultCodecRequirements,
+ transportInstance: tt.fields.transportInstance,
+ defaultIncomingMessageChannel: tt.fields.defaultIncomingMessageChannel,
+ expectations: tt.fields.expectations,
+ running: tt.fields.running,
+ customMessageHandling: tt.fields.customMessageHandling,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, m.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 2d3a0f9fa5..67caedf5cd 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -21,6 +21,7 @@ package _default
import (
"context"
+ "fmt"
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/rs/zerolog"
"runtime/debug"
@@ -359,6 +360,20 @@ func (d *defaultConnection) GetPlcValueHandler() spi.PlcValueHandler {
return d.valueHandler
}
+func (d *defaultConnection) String() string {
+ return fmt.Sprintf("DefaultConnection{\n"+
+ "\tttl: %s,\n"+
+ "\tconnected: %t,\n"+
+ "\ttagHandler: %s,\n"+
+ "\tvalueHandler: %s,\n"+
+ "}",
+ d.defaultTtl,
+ d.connected,
+ d.tagHandler,
+ d.valueHandler,
+ )
+}
+
func (m DefaultConnectionMetadata) GetConnectionAttributes() map[string]string {
return m.ConnectionAttributes
}
diff --git a/plc4go/spi/default/DefaultConnection_test.go b/plc4go/spi/default/DefaultConnection_test.go
index c013b0690e..2e26883a1c 100644
--- a/plc4go/spi/default/DefaultConnection_test.go
+++ b/plc4go/spi/default/DefaultConnection_test.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"github.com/apache/plc4x/plc4go/spi/tracer"
+ "github.com/rs/zerolog"
"testing"
"time"
@@ -1319,3 +1320,42 @@ func Test_plcConnectionPingResult_GetErr(t *testing.T) {
})
}
}
+
+func Test_defaultConnection_String(t *testing.T) {
+ type fields struct {
+ DefaultConnectionRequirements DefaultConnectionRequirements
+ defaultTtl time.Duration
+ connected bool
+ tagHandler spi.PlcTagHandler
+ valueHandler spi.PlcValueHandler
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ want: "DefaultConnection{\n" +
+ "\tttl: 0s,\n" +
+ "\tconnected: false,\n" +
+ "\ttagHandler: %!s(<nil>),\n" +
+ "\tvalueHandler: %!s(<nil>),\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ d := &defaultConnection{
+ DefaultConnectionRequirements: tt.fields.DefaultConnectionRequirements,
+ defaultTtl: tt.fields.defaultTtl,
+ connected: tt.fields.connected,
+ tagHandler: tt.fields.tagHandler,
+ valueHandler: tt.fields.valueHandler,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, d.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 22f8d20e5b..ae4135d60a 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -20,6 +20,7 @@
package pool
import (
+ "fmt"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/rs/zerolog"
"runtime/debug"
@@ -171,3 +172,15 @@ func (e *dynamicExecutor) Stop() {
e.dynamicWorkers.Wait()
e.log.Trace().Msg("stopped")
}
+
+func (e *dynamicExecutor) String() string {
+ return fmt.Sprintf("dynamicExecutor{\n"+
+ "\texecutor: %s\n"+
+ "\tmaxNumberOfWorkers: %d\n"+
+ "\tcurrentNumberOfWorkers: %d\n"+
+ "}",
+ e.executor,
+ e.maxNumberOfWorkers,
+ e.currentNumberOfWorkers.Load(),
+ )
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 1db92a5d7f..190fceb41d 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -20,6 +20,7 @@
package pool
import (
+ "github.com/stretchr/testify/assert"
"testing"
"time"
)
@@ -168,3 +169,45 @@ func Test_dynamicExecutor_Stop(t *testing.T) {
})
}
}
+
+func Test_dynamicExecutor_String(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ fields: fields{
+ executor: &executor{},
+ maxNumberOfWorkers: 3,
+ },
+ want: "dynamicExecutor{\n" +
+ "\texecutor: executor{\n" +
+ "\trunning: false,\n" +
+ "\tshutdown: false,\n" +
+ "\tworker: [],\n" +
+ "\tqueueDepth: 0,\n" +
+ "\tworkItems: 0 elements,\n" +
+ "\ttraceWorkers: false,\n" +
+ "\n" +
+ "}\n" +
+ "\tmaxNumberOfWorkers: 3\n" +
+ "\tcurrentNumberOfWorkers: 0\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ }
+ assert.Equalf(t, tt.want, e.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 79bddb6dc0..0e8e458903 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -21,6 +21,7 @@ package pool
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
@@ -127,3 +128,21 @@ func (e *executor) Close() error {
func (e *executor) IsRunning() bool {
return e.running && !e.shutdown
}
+
+func (e *executor) String() string {
+ return fmt.Sprintf("executor{\n"+
+ "\trunning: %t,\n"+
+ "\tshutdown: %t,\n"+
+ "\tworker: %s,\n"+
+ "\tqueueDepth: %d,\n"+
+ "\tworkItems: %d elements,\n"+
+ "\ttraceWorkers: %t,\n"+
+ "\n}",
+ e.running,
+ e.shutdown,
+ e.worker,
+ e.queueDepth,
+ len(e.workItems),
+ e.traceWorkers,
+ )
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 2a43b1366e..577ce986c0 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -25,6 +25,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"sync"
+ "sync/atomic"
"testing"
"time"
)
@@ -411,3 +412,68 @@ func Test_executor_isTraceWorkers(t *testing.T) {
})
}
}
+
+func Test_executor_String(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ fields: fields{
+ running: true,
+ shutdown: true,
+ worker: []*worker{
+ {
+ id: 1,
+ shutdown: atomic.Bool{},
+ interrupted: atomic.Bool{},
+ hasEnded: atomic.Bool{},
+ lastReceived: time.Time{},
+ },
+ },
+ queueDepth: 2,
+ traceWorkers: true,
+ },
+ want: "executor{\n" +
+ "\trunning: true,\n" +
+ "\tshutdown: true,\n" +
+ "\tworker: [worker{\n" +
+ "\tid: 1,\n" +
+ "\tshutdown: false,\n" +
+ "\tinterrupted: false,\n" +
+ "\thasEnded: false,\n" +
+ "\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n" +
+ "}],\n" +
+ "\tqueueDepth: 2,\n" +
+ "\tworkItems: 0 elements,\n" +
+ "\ttraceWorkers: true,\n" +
+ "\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
index 942480bc1a..233df1315d 100644
--- a/plc4go/spi/pool/workItem.go
+++ b/plc4go/spi/pool/workItem.go
@@ -28,5 +28,13 @@ type workItem struct {
}
func (w workItem) String() string {
- return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+ return fmt.Sprintf("workItem{\n"+
+ "\twid: %d,\n"+
+ "\trunnable: %t,\n"+
+ "\tcompletionFuture: %s,\n"+
+ "}",
+ w.workItemId,
+ w.runnable != nil,
+ w.completionFuture,
+ )
}
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
index ebc4b30c5c..1477f69012 100644
--- a/plc4go/spi/pool/workItem_test.go
+++ b/plc4go/spi/pool/workItem_test.go
@@ -37,7 +37,11 @@ func Test_workItem_String(t *testing.T) {
}{
{
name: "Simple test",
- want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+ want: "workItem{\n" +
+ "\twid: 0,\n" +
+ "\trunnable: false,\n" +
+ "\tcompletionFuture: <nil>,\n" +
+ "}",
},
}
for _, tt := range tests {
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 7530ed0ad9..77909cd1b6 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -20,6 +20,7 @@
package pool
import (
+ "fmt"
"runtime/debug"
"sync"
"sync/atomic"
@@ -94,3 +95,19 @@ func (w *worker) work() {
w.hasEnded.Store(true)
workerLog.Debug().Msg("setting to ended")
}
+
+func (w *worker) String() string {
+ return fmt.Sprintf("worker{\n"+
+ "\tid: %d,\n"+
+ "\tshutdown: %v,\n"+
+ "\tinterrupted: %t,\n"+
+ "\thasEnded: %t,\n"+
+ "\tlastReceived: %s,\n"+
+ "}",
+ w.id,
+ w.shutdown.Load(),
+ w.interrupted.Load(),
+ w.hasEnded.Load(),
+ w.lastReceived,
+ )
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 4f604d2efb..31c0ece2c0 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -225,3 +225,29 @@ func Test_worker_work(t *testing.T) {
})
}
}
+
+func Test_worker_String(t *testing.T) {
+ type fields struct {
+ id int
+ lastReceived time.Time
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ want: "worker{\n\tid: 0,\n\tshutdown: false,\n\tinterrupted: false,\n\thasEnded: false,\n\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ lastReceived: tt.fields.lastReceived,
+ }
+ assert.Equalf(t, tt.want, w.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 8a93dc6f87..fe63035ebd 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -22,6 +22,7 @@ package transactions
import (
"container/list"
"context"
+ "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"io"
@@ -60,7 +61,7 @@ type RequestTransactionManager interface {
func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...options.WithOption) RequestTransactionManager {
_requestTransactionManager := &requestTransactionManager{
numberOfConcurrentRequests: numberOfConcurrentRequests,
- transactionId: 0,
+ currentTransactionId: 0,
workLog: *list.New(),
executor: sharedExecutorInstance,
@@ -98,8 +99,8 @@ type requestTransactionManager struct {
// How many transactions are allowed to run at the same time?
numberOfConcurrentRequests int
// Assigns each request a Unique Transaction Id, especially important for failure handling
- transactionId int32
- transactionMutex sync.RWMutex
+ currentTransactionId int32
+ transactionMutex sync.RWMutex
// Important, this is a FIFO Queue for Fairness!
workLog list.List
workLogMutex sync.RWMutex
@@ -162,9 +163,9 @@ func (r *requestTransactionManager) processWorklog() {
func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
- currentTransactionId := r.transactionId
- r.transactionId += 1
- transactionLogger := r.log.With().Int32("transactionId", currentTransactionId).Logger()
+ currentTransactionId := r.currentTransactionId
+ r.currentTransactionId += 1
+ transactionLogger := r.log.With().Int32("currentTransactionId", currentTransactionId).Logger()
if !r.traceTransactionManagerTransactions {
transactionLogger = zerolog.Nop()
}
@@ -246,3 +247,23 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
r.runningRequests = nil
return r.executor.Close()
}
+
+func (r *requestTransactionManager) String() string {
+ return fmt.Sprintf("RequestTransactionManager{\n"+
+ "\trunningRequests: %s,\n"+
+ "\tnumberOfConcurrentRequests: %d,\n"+
+ "\tcurrentTransactionId: %d,\n"+
+ "\tworkLog: %d elements,\n"+
+ "\texecutor: %s,\n"+
+ "\tshutdown: %t,\n"+
+ "\ttraceTransactionManagerTransactions: %t,\n"+
+ "}",
+ r.runningRequests,
+ r.numberOfConcurrentRequests,
+ r.currentTransactionId,
+ r.workLog.Len(),
+ r.executor,
+ r.shutdown,
+ r.traceTransactionManagerTransactions,
+ )
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 4445edad1e..4925ce7790 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -106,7 +106,7 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
}
@@ -135,7 +135,7 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
@@ -148,7 +148,7 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
@@ -194,7 +194,7 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
@@ -212,7 +212,7 @@ func Test_requestTransactionManager_endRequest(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
}
@@ -249,7 +249,7 @@ func Test_requestTransactionManager_endRequest(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
@@ -264,7 +264,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
log zerolog.Logger
@@ -304,7 +304,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
log: tt.fields.log,
@@ -320,7 +320,7 @@ func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
}
@@ -338,7 +338,7 @@ func Test_requestTransactionManager_getNumberOfActiveRequests(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
@@ -353,7 +353,7 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
}
@@ -401,7 +401,7 @@ func Test_requestTransactionManager_processWorklog(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
@@ -414,7 +414,7 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
}
@@ -442,7 +442,7 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
}
@@ -455,7 +455,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
@@ -486,7 +486,7 @@ func Test_requestTransactionManager_Close(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
@@ -502,7 +502,7 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
numberOfConcurrentRequests int
- transactionId int32
+ currentTransactionId int32
workLog list.List
executor pool.Executor
shutdown bool
@@ -566,7 +566,7 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
r := &requestTransactionManager{
runningRequests: tt.fields.runningRequests,
numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
- transactionId: tt.fields.transactionId,
+ currentTransactionId: tt.fields.currentTransactionId,
workLog: tt.fields.workLog,
executor: tt.fields.executor,
shutdown: tt.fields.shutdown,
@@ -577,3 +577,80 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
})
}
}
+
+func Test_requestTransactionManager_String(t *testing.T) {
+ type fields struct {
+ runningRequests []*requestTransaction
+ numberOfConcurrentRequests int
+ currentTransactionId int32
+ workLog list.List
+ executor pool.Executor
+ shutdown bool
+ traceTransactionManagerTransactions bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ fields: fields{
+ runningRequests: []*requestTransaction{
+ {
+ transactionId: 2,
+ },
+ },
+ numberOfConcurrentRequests: 3,
+ currentTransactionId: 4,
+ workLog: func() list.List {
+ v := list.List{}
+ v.PushBack(nil)
+ return v
+ }(),
+ executor: pool.NewFixedSizeExecutor(1, 1),
+ shutdown: true,
+ traceTransactionManagerTransactions: true,
+ },
+ want: "RequestTransactionManager{\n" +
+ "\trunningRequests: [Transaction{tid:2}],\n" +
+ "\tnumberOfConcurrentRequests: 3,\n" +
+ "\tcurrentTransactionId: 4,\n" +
+ "\tworkLog: 1 elements,\n" +
+ "\texecutor: executor{\n" +
+ "\trunning: false,\n" +
+ "\tshutdown: false,\n" +
+ "\tworker: [worker{\n" +
+ "\tid: 0,\n" +
+ "\tshutdown: false,\n" +
+ "\tinterrupted: false,\n" +
+ "\thasEnded: false,\n" +
+ "\tlastReceived: 0001-01-01 00:00:00 +0000 UTC,\n" +
+ "}],\n" +
+ "\tqueueDepth: 1,\n" +
+ "\tworkItems: 0 elements,\n" +
+ "\ttraceWorkers: false,\n" +
+ "\n" +
+ "},\n" +
+ "\tshutdown: true,\n" +
+ "\ttraceTransactionManagerTransactions: true,\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ r := &requestTransactionManager{
+ runningRequests: tt.fields.runningRequests,
+ numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
+ currentTransactionId: tt.fields.currentTransactionId,
+ workLog: tt.fields.workLog,
+ executor: tt.fields.executor,
+ shutdown: tt.fields.shutdown,
+ traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, r.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/transactions/completedFuture.go b/plc4go/spi/transactions/completedFuture.go
index ac8227f579..9d5ef8d9ab 100644
--- a/plc4go/spi/transactions/completedFuture.go
+++ b/plc4go/spi/transactions/completedFuture.go
@@ -19,7 +19,10 @@
package transactions
-import "context"
+import (
+ "context"
+ "fmt"
+)
type completedFuture struct {
err error
@@ -32,3 +35,7 @@ func (c completedFuture) AwaitCompletion(_ context.Context) error {
func (completedFuture) Cancel(_ bool, _ error) {
// No op
}
+
+func (c completedFuture) String() string {
+ return fmt.Sprintf("completedFuture{\n\terr: %v,\n}", c.err)
+}
diff --git a/plc4go/spi/transactions/completedFuture_test.go b/plc4go/spi/transactions/completedFuture_test.go
new file mode 100644
index 0000000000..c875decec7
--- /dev/null
+++ b/plc4go/spi/transactions/completedFuture_test.go
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+ "context"
+ "fmt"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_completedFuture_AwaitCompletion(t *testing.T) {
+ type fields struct {
+ err error
+ }
+ type args struct {
+ in0 context.Context
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "does nothing",
+ wantErr: assert.NoError,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := completedFuture{
+ err: tt.fields.err,
+ }
+ tt.wantErr(t, c.AwaitCompletion(tt.args.in0), fmt.Sprintf("AwaitCompletion(%v)", tt.args.in0))
+ })
+ }
+}
+
+func Test_completedFuture_Cancel(t *testing.T) {
+ type fields struct {
+ err error
+ }
+ type args struct {
+ in0 bool
+ in1 error
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ }{
+ {
+ name: "does nothing",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ co := completedFuture{
+ err: tt.fields.err,
+ }
+ co.Cancel(tt.args.in0, tt.args.in1)
+ })
+ }
+}
+
+func Test_completedFuture_String(t *testing.T) {
+ type fields struct {
+ err error
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "gives the error",
+ want: "completedFuture{\n\terr: <nil>,\n}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := completedFuture{
+ err: tt.fields.err,
+ }
+ assert.Equalf(t, tt.want, c.String(), "String()")
+ })
+ }
+}
[plc4x] 04/04: feat(plc4go/cbus): added more Stringer implementations
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 54221760969bf088eb07df4eb8a46f933dff6ca0
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 14:17:04 2023 +0200
feat(plc4go/cbus): added more Stringer implementations
---
plc4go/internal/cbus/Configuration.go | 5 ++
plc4go/internal/cbus/Configuration_test.go | 60 +++++++++++++++++++
plc4go/internal/cbus/Connection.go | 46 ++++++++++++--
plc4go/internal/cbus/Connection_test.go | 11 +++-
plc4go/internal/cbus/DriverContext.go | 6 ++
plc4go/internal/cbus/MessageCodec.go | 21 +++++++
plc4go/internal/cbus/MessageCodec_test.go | 49 +++++++++++++++
plc4go/internal/cbus/Subscriber.go | 76 ++++++++++++-----------
plc4go/internal/cbus/Subscriber_test.go | 96 ++++++++++++------------------
9 files changed, 270 insertions(+), 100 deletions(-)
diff --git a/plc4go/internal/cbus/Configuration.go b/plc4go/internal/cbus/Configuration.go
index 5655c87473..bd02271184 100644
--- a/plc4go/internal/cbus/Configuration.go
+++ b/plc4go/internal/cbus/Configuration.go
@@ -20,6 +20,7 @@
package cbus
import (
+ "fmt"
"github.com/rs/zerolog"
"reflect"
"strconv"
@@ -98,3 +99,7 @@ func getFromOptions(localLog zerolog.Logger, options map[string][]string, key st
}
return ""
}
+
+func (c Configuration) String() string {
+ return fmt.Sprintf("%#v", c)
+}
diff --git a/plc4go/internal/cbus/Configuration_test.go b/plc4go/internal/cbus/Configuration_test.go
index 733d56ce27..17779239a8 100644
--- a/plc4go/internal/cbus/Configuration_test.go
+++ b/plc4go/internal/cbus/Configuration_test.go
@@ -201,3 +201,63 @@ func Test_getFromOptions(t *testing.T) {
})
}
}
+
+func TestConfiguration_String(t *testing.T) {
+ type fields struct {
+ Srchk bool
+ Exstat bool
+ Pun bool
+ LocalSal bool
+ Pcn bool
+ Idmon bool
+ Monitor bool
+ Smart bool
+ XonXoff bool
+ Connect bool
+ MonitoredApplication1 byte
+ MonitoredApplication2 byte
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ fields: fields{
+ Srchk: true,
+ Exstat: true,
+ Pun: true,
+ LocalSal: true,
+ Pcn: true,
+ Idmon: true,
+ Monitor: true,
+ Smart: true,
+ XonXoff: true,
+ Connect: true,
+ MonitoredApplication1: 2,
+ MonitoredApplication2: 3,
+ },
+ want: "cbus.Configuration{Srchk:true, Exstat:true, Pun:true, LocalSal:true, Pcn:true, Idmon:true, Monitor:true, Smart:true, XonXoff:true, Connect:true, MonitoredApplication1:0x2, MonitoredApplication2:0x3}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ c := Configuration{
+ Srchk: tt.fields.Srchk,
+ Exstat: tt.fields.Exstat,
+ Pun: tt.fields.Pun,
+ LocalSal: tt.fields.LocalSal,
+ Pcn: tt.fields.Pcn,
+ Idmon: tt.fields.Idmon,
+ Monitor: tt.fields.Monitor,
+ Smart: tt.fields.Smart,
+ XonXoff: tt.fields.XonXoff,
+ Connect: tt.fields.Connect,
+ MonitoredApplication1: tt.fields.MonitoredApplication1,
+ MonitoredApplication2: tt.fields.MonitoredApplication2,
+ }
+ assert.Equalf(t, tt.want, c.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 9d65646a7a..393007d21c 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/tracer"
"github.com/apache/plc4x/plc4go/spi/transactions"
"github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
"runtime/debug"
"sync"
"time"
@@ -57,6 +58,10 @@ func (t *AlphaGenerator) getAndIncrement() byte {
return result
}
+func (t *AlphaGenerator) String() string {
+ return fmt.Sprintf("AlphaGenerator(currentAlpha: %c)", t.currentAlpha)
+}
+
type Connection struct {
_default.DefaultConnection
alphaGenerator AlphaGenerator
@@ -166,7 +171,11 @@ func (c *Connection) WriteRequestBuilder() apiModel.PlcWriteRequestBuilder {
}
func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
- return spiModel.NewDefaultPlcSubscriptionRequestBuilder(c.GetPlcTagHandler(), c.GetPlcValueHandler(), NewSubscriber(c, options.WithCustomLogger(c.log)))
+ return spiModel.NewDefaultPlcSubscriptionRequestBuilder(
+ c.GetPlcTagHandler(),
+ c.GetPlcValueHandler(),
+ NewSubscriber(c.addSubscriber, options.WithCustomLogger(c.log)),
+ )
}
func (c *Connection) UnsubscriptionRequestBuilder() apiModel.PlcUnsubscriptionRequestBuilder {
@@ -189,7 +198,28 @@ func (c *Connection) addSubscriber(subscriber *Subscriber) {
}
func (c *Connection) String() string {
- return fmt.Sprintf("cbus.Connection")
+ return fmt.Sprintf(
+ "cbus.Connection{\n"+
+ "\tDefaultConnection: %s,\n"+
+ "\tAlphaGenerator: %s\n"+
+ "\tMessageCodec: %s\n"+
+ "\tsubscribers: %s\n"+
+ "\ttm: %s\n"+
+ "\tconfiguration: %s\n"+
+ "\tdriverContext: %s\n"+
+ "\tconnectionId: %s\n"+
+ "\ttracer: %s\n"+
+ "}",
+ c.DefaultConnection,
+ &c.alphaGenerator,
+ c.messageCodec,
+ c.subscribers,
+ c.tm,
+ c.configuration,
+ c.driverContext,
+ c.connectionId,
+ c.tracer,
+ )
}
func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
@@ -235,12 +265,16 @@ func (c *Connection) startSubscriptionHandler() {
c.log.Debug().Msg("SAL handler stated")
for c.IsConnected() {
for monitoredSal := range c.messageCodec.monitoredSALs {
+ handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredSAL(monitoredSal); ok {
c.log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
- continue
+ handled = true
}
}
+ if !handled {
+ log.Debug().Msgf("SAL was not handled:\n%s", monitoredSal)
+ }
}
}
c.log.Info().Msg("Ending SAL handler")
@@ -255,12 +289,16 @@ func (c *Connection) startSubscriptionHandler() {
c.log.Debug().Msg("default MMI started")
for c.IsConnected() {
for calReply := range c.messageCodec.monitoredMMIs {
+ handled := false
for _, subscriber := range c.subscribers {
if ok := subscriber.handleMonitoredMMI(calReply); ok {
c.log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
- continue
+ handled = true
}
}
+ if !handled {
+ log.Debug().Msgf("MMI was not handled:\n%s", calReply)
+ }
}
}
c.log.Info().Msg("Ending MMI handler")
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 8feae638e5..b96d950f00 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -539,7 +539,16 @@ func TestConnection_String(t *testing.T) {
}{
{
name: "a string",
- want: "cbus.Connection",
+ want: "cbus.Connection{\n" +
+ "\tDefaultConnection: %!s(<nil>),\n" +
+ "\tAlphaGenerator: AlphaGenerator(currentAlpha: \x00)\n" +
+ "\tMessageCodec: <nil>\n" +
+ "\tsubscribers: []\n" +
+ "\ttm: %!s(<nil>)\n" +
+ "\tconfiguration: cbus.Configuration{Srchk:false, Exstat:false, Pun:false, LocalSal:false, Pcn:false, Idmon:false, Monitor:false, Smart:false, XonXoff:false, Connect:false, MonitoredApplication1:0x0, MonitoredApplication2:0x0}\n\tdriverContext: cbus.DriverContext{awaitSetupComplete:false, awaitDisconnectComplete:false}\n" +
+ "\tconnectionId: \n" +
+ "\ttracer: <nil>\n" +
+ "}",
},
}
for _, tt := range tests {
diff --git a/plc4go/internal/cbus/DriverContext.go b/plc4go/internal/cbus/DriverContext.go
index f0ed07b1f9..7c4307c960 100644
--- a/plc4go/internal/cbus/DriverContext.go
+++ b/plc4go/internal/cbus/DriverContext.go
@@ -19,6 +19,8 @@
package cbus
+import "fmt"
+
type DriverContext struct {
awaitSetupComplete bool
awaitDisconnectComplete bool
@@ -27,3 +29,7 @@ type DriverContext struct {
func NewDriverContext(_ Configuration) DriverContext {
return DriverContext{}
}
+
+func (d DriverContext) String() string {
+ return fmt.Sprintf("%#v", d)
+}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 1c4c3e3743..b8119c7d8f 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,6 +21,7 @@ package cbus
import (
"bufio"
+ "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/rs/zerolog"
"hash/crc32"
@@ -322,6 +323,26 @@ lookingForTheEnd:
return cBusMessage, nil
}
+func (m *MessageCodec) String() string {
+ return fmt.Sprintf("MessageCodec{\n"+
+ "\tDefaultCodec: %s,\n"+
+ "\trequestContext: %s,\n"+
+ "\tcbusOptions: %s,\n"+
+ "\tmonitoredMMIs: %d elements,\n"+
+ "\tmonitoredSALs: %d elements,\n"+
+ "\tlastPackageHash: %d,\n"+
+ "\thashEncountered: %d,\n"+
+ "}",
+ m.DefaultCodec,
+ m.requestContext,
+ m.cbusOptions,
+ len(m.monitoredSALs),
+ len(m.monitoredSALs),
+ m.lastPackageHash,
+ m.hashEncountered,
+ )
+}
+
func extractMMIAndSAL(log zerolog.Logger) _default.CustomMessageHandler {
return func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
log.Trace().Msgf("Custom handling message:\n%s", message)
diff --git a/plc4go/internal/cbus/MessageCodec_test.go b/plc4go/internal/cbus/MessageCodec_test.go
index 2c11659dfb..68abdb02e0 100644
--- a/plc4go/internal/cbus/MessageCodec_test.go
+++ b/plc4go/internal/cbus/MessageCodec_test.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/plc4x/plc4go/spi/testutils"
"github.com/apache/plc4x/plc4go/spi/transports"
"github.com/apache/plc4x/plc4go/spi/transports/test"
+ "github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -878,3 +879,51 @@ func Test_extractMMIAndSAL(t *testing.T) {
})
}
}
+
+func TestMessageCodec_String(t *testing.T) {
+ type fields struct {
+ DefaultCodec _default.DefaultCodec
+ requestContext readWriteModel.RequestContext
+ cbusOptions readWriteModel.CBusOptions
+ monitoredMMIs chan readWriteModel.CALReply
+ monitoredSALs chan readWriteModel.MonitoredSAL
+ lastPackageHash uint32
+ hashEncountered uint
+ currentlyReportedServerErrors uint
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "string it",
+ want: "MessageCodec{\n" +
+ "\tDefaultCodec: %!s(<nil>),\n" +
+ "\trequestContext: %!s(<nil>),\n" +
+ "\tcbusOptions: %!s(<nil>),\n" +
+ "\tmonitoredMMIs: 0 elements,\n" +
+ "\tmonitoredSALs: 0 elements,\n" +
+ "\tlastPackageHash: 0,\n" +
+ "\thashEncountered: 0,\n" +
+ "}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ m := &MessageCodec{
+ DefaultCodec: tt.fields.DefaultCodec,
+ requestContext: tt.fields.requestContext,
+ cbusOptions: tt.fields.cbusOptions,
+ monitoredMMIs: tt.fields.monitoredMMIs,
+ monitoredSALs: tt.fields.monitoredSALs,
+ lastPackageHash: tt.fields.lastPackageHash,
+ hashEncountered: tt.fields.hashEncountered,
+ currentlyReportedServerErrors: tt.fields.currentlyReportedServerErrors,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, m.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/internal/cbus/Subscriber.go b/plc4go/internal/cbus/Subscriber.go
index e0fef87a17..0454ac9ebb 100644
--- a/plc4go/internal/cbus/Subscriber.go
+++ b/plc4go/internal/cbus/Subscriber.go
@@ -37,22 +37,22 @@ import (
)
type Subscriber struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
log zerolog.Logger
}
-func NewSubscriber(connection *Connection, _options ...options.WithOption) *Subscriber {
+func NewSubscriber(addSubscriber func(subscriber *Subscriber), _options ...options.WithOption) *Subscriber {
return &Subscriber{
- connection: connection,
- consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
+ addSubscriber: addSubscriber,
+ consumers: make(map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer),
log: options.ExtractCustomLogger(_options...),
}
}
-func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
+func (s *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.PlcSubscriptionRequest) <-chan apiModel.PlcSubscriptionRequestResult {
result := make(chan apiModel.PlcSubscriptionRequestResult, 1)
go func() {
defer func() {
@@ -63,14 +63,14 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
internalPlcSubscriptionRequest := subscriptionRequest.(*spiModel.DefaultPlcSubscriptionRequest)
// Add this subscriber to the connection.
- m.connection.addSubscriber(m)
+ s.addSubscriber(s)
// Just populate all requests with an OK
responseCodes := map[string]apiModel.PlcResponseCode{}
subscriptionValues := make(map[string]apiModel.PlcSubscriptionHandle)
for _, tagName := range internalPlcSubscriptionRequest.GetTagNames() {
responseCodes[tagName] = apiModel.PlcResponseCode_OK
- subscriptionValues[tagName] = NewSubscriptionHandle(m, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
+ subscriptionValues[tagName] = NewSubscriptionHandle(s, tagName, internalPlcSubscriptionRequest.GetTag(tagName), internalPlcSubscriptionRequest.GetType(tagName), internalPlcSubscriptionRequest.GetInterval(tagName))
}
result <- spiModel.NewDefaultPlcSubscriptionRequestResult(
@@ -79,7 +79,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
subscriptionRequest,
responseCodes,
subscriptionValues,
- options.WithCustomLogger(m.log),
+ options.WithCustomLogger(s.log),
),
nil,
)
@@ -87,7 +87,7 @@ func (m *Subscriber) Subscribe(_ context.Context, subscriptionRequest apiModel.P
return result
}
-func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
+func (s *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiModel.PlcUnsubscriptionRequest) <-chan apiModel.PlcUnsubscriptionRequestResult {
// TODO: handle context
result := make(chan apiModel.PlcUnsubscriptionRequestResult, 1)
result <- spiModel.NewDefaultPlcUnsubscriptionRequestResult(unsubscriptionRequest, nil, errors.New("Not Implemented"))
@@ -100,7 +100,7 @@ func (m *Subscriber) Unsubscribe(ctx context.Context, unsubscriptionRequest apiM
return result
}
-func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
+func (s *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
var unitAddressString string
switch calReply := calReply.(type) {
case readWriteModel.CALReplyLongExactly:
@@ -119,18 +119,18 @@ func (m *Subscriber) handleMonitoredMMI(calReply readWriteModel.CALReply) bool {
}
calData := calReply.GetCalData()
handled := false
- for registration, consumer := range m.consumers {
+ for registration, consumer := range s.consumers {
for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
- handled = handled || m.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
+ handled = handled || s.offerMMI(unitAddressString, calData, subscriptionHandle.(*SubscriptionHandle), consumer)
}
}
return handled
}
-func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.CALData, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*mmiMonitorTag)
if !ok {
- m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+ s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
return false
}
@@ -146,7 +146,7 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
- m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+ s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
return false
}
}
@@ -228,12 +228,12 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
plcValues[tagName] = spiValues.NewPlcList(plcListValues)
}
default:
- m.log.Error().Msgf("Unmapped type %T", calData)
+ s.log.Error().Msgf("Unmapped type %T", calData)
return false
}
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
- m.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+ s.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
return false
}
}
@@ -249,20 +249,20 @@ func (m *Subscriber) offerMMI(unitAddressString string, calData readWriteModel.C
return true
}
-func (m *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
+func (s *Subscriber) handleMonitoredSAL(sal readWriteModel.MonitoredSAL) bool {
handled := false
- for registration, consumer := range m.consumers {
+ for registration, consumer := range s.consumers {
for _, subscriptionHandle := range registration.GetSubscriptionHandles() {
- handled = handled || m.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
+ handled = handled || s.offerSAL(sal, subscriptionHandle.(*SubscriptionHandle), consumer)
}
}
return handled
}
-func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
+func (s *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandle *SubscriptionHandle, consumer apiModel.PlcSubscriptionEventConsumer) bool {
tag, ok := subscriptionHandle.tag.(*salMonitorTag)
if !ok {
- m.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
+ s.log.Debug().Msgf("Unusable tag for mmi subscription %s", subscriptionHandle.tag)
return false
}
tags := map[string]apiModel.PlcTag{}
@@ -306,7 +306,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
if unitAddress := tag.GetUnitAddress(); unitAddress != nil {
unitSuffix := fmt.Sprintf("u%d", unitAddress.GetAddress())
if !strings.HasSuffix(unitAddressString, unitSuffix) {
- m.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
+ s.log.Debug().Msgf("Current address string %s has not the suffix %s", unitAddressString, unitSuffix)
return false
}
}
@@ -314,7 +314,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
if application := tag.GetApplication(); application != nil {
if actualApplicationIdString := application.ApplicationId().String(); applicationString != actualApplicationIdString {
- m.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
+ s.log.Debug().Msgf("Current application id %s doesn't match actual id %s", unitAddressString, actualApplicationIdString)
return false
}
}
@@ -336,7 +336,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataErrorReportingExactly:
commandTypeGetter = salData.GetErrorReportingData().GetCommandType()
case readWriteModel.SALDataFreeUsageExactly:
- m.log.Info().Msg("Unknown command type")
+ s.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataHeatingExactly:
commandTypeGetter = salData.GetHeatingData().GetCommandType()
case readWriteModel.SALDataHvacActuatorExactly:
@@ -354,9 +354,9 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataPoolsSpasPondsFountainsControlExactly:
commandTypeGetter = salData.GetPoolsSpaPondsFountainsData().GetCommandType()
case readWriteModel.SALDataReservedExactly:
- m.log.Info().Msg("Unknown command type")
+ s.log.Info().Msg("Unknown command type")
case readWriteModel.SALDataRoomControlSystemExactly:
- m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+ s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataSecurityExactly:
commandTypeGetter = salData.GetSecurityData().GetCommandType()
case readWriteModel.SALDataTelephonyStatusAndControlExactly:
@@ -364,13 +364,13 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
case readWriteModel.SALDataTemperatureBroadcastExactly:
commandTypeGetter = salData.GetTemperatureBroadcastData().GetCommandType()
case readWriteModel.SALDataTestingExactly:
- m.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
+ s.log.Info().Msg("Unknown command type not implemented yet") // TODO: implement once there
case readWriteModel.SALDataTriggerControlExactly:
commandTypeGetter = salData.GetTriggerControlData().GetCommandType()
case readWriteModel.SALDataVentilationExactly:
commandTypeGetter = salData.GetVentilationData().GetCommandType()
default:
- m.log.Error().Msgf("Unmapped type %T", salData)
+ s.log.Error().Msgf("Unmapped type %T", salData)
}
commandType := "Unknown"
if commandTypeGetter != nil {
@@ -383,7 +383,7 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
rbvb := spiValues.NewWriteBufferPlcValueBased()
err := salData.SerializeWithWriteBuffer(context.Background(), rbvb)
if err != nil {
- m.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
+ s.log.Error().Err(err).Msg("Error serializing to plc value... just returning it as string")
plcValues[tagName] = spiValues.NewPlcSTRING(fmt.Sprintf("%s", salData))
} else {
plcValues[tagName] = rbvb.GetPlcValue()
@@ -397,12 +397,16 @@ func (m *Subscriber) offerSAL(sal readWriteModel.MonitoredSAL, subscriptionHandl
return true
}
-func (m *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
- consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(m, consumer, handles...)
- m.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
+func (s *Subscriber) Register(consumer apiModel.PlcSubscriptionEventConsumer, handles []apiModel.PlcSubscriptionHandle) apiModel.PlcConsumerRegistration {
+ consumerRegistration := spiModel.NewDefaultPlcConsumerRegistration(s, consumer, handles...)
+ s.consumers[consumerRegistration.(*spiModel.DefaultPlcConsumerRegistration)] = consumer
return consumerRegistration
}
-func (m *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
- delete(m.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+func (s *Subscriber) Unregister(registration apiModel.PlcConsumerRegistration) {
+ delete(s.consumers, registration.(*spiModel.DefaultPlcConsumerRegistration))
+}
+
+func (s *Subscriber) String() string {
+ return fmt.Sprintf("cbus.Subcriber{\n\tconsumers: %d elements\n}", len(s.consumers))
}
diff --git a/plc4go/internal/cbus/Subscriber_test.go b/plc4go/internal/cbus/Subscriber_test.go
index d5d794c950..b36f7ae481 100644
--- a/plc4go/internal/cbus/Subscriber_test.go
+++ b/plc4go/internal/cbus/Subscriber_test.go
@@ -21,9 +21,6 @@ package cbus
import (
"context"
- "github.com/apache/plc4x/plc4go/spi/options"
- "github.com/apache/plc4x/plc4go/spi/testutils"
- "github.com/apache/plc4x/plc4go/spi/utils"
"testing"
"time"
@@ -36,7 +33,7 @@ import (
func TestNewSubscriber(t *testing.T) {
type args struct {
- connection *Connection
+ addSubscriber func(subscriber *Subscriber)
}
tests := []struct {
name string
@@ -52,15 +49,15 @@ func TestNewSubscriber(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- assert.Equalf(t, tt.want, NewSubscriber(tt.args.connection), "NewSubscriber(%v)", tt.args.connection)
+ assert.Equalf(t, tt.want, NewSubscriber(tt.args.addSubscriber), "NewSubscriber(%t)", tt.args.addSubscriber != nil)
})
}
}
func TestSubscriber_Subscribe(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
in0 context.Context
@@ -80,28 +77,9 @@ func TestSubscriber_Subscribe(t *testing.T) {
subscriptionRequest: spiModel.NewDefaultPlcSubscriptionRequest(nil, []string{"blub"}, map[string]apiModel.PlcTag{"blub": NewMMIMonitorTag(readWriteModel.NewUnitAddress(1), nil, 1)}, nil, nil, nil),
},
setup: func(t *testing.T, fields *fields, args *args) {
- // Setup logger
- logger := testutils.ProduceTestingLogger(t)
-
- loggerOption := options.WithCustomLogger(logger)
-
- // Set the model logger to the logger above
- testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
-
- codec := NewMessageCodec(nil, loggerOption)
- connection := NewConnection(codec, Configuration{}, DriverContext{}, nil, nil, nil, loggerOption)
- t.Cleanup(func() {
- timer := time.NewTimer(1 * time.Second)
- t.Cleanup(func() {
- utils.CleanupTimer(timer)
- })
- select {
- case <-connection.Close():
- case <-timer.C:
- t.Error("timeout")
- }
- })
- fields.connection = connection
+ fields.addSubscriber = func(subscriber *Subscriber) {
+ assert.NotNil(t, subscriber)
+ }
},
wantAsserter: func(t *testing.T, results <-chan apiModel.PlcSubscriptionRequestResult) bool {
timer := time.NewTimer(2 * time.Second)
@@ -123,8 +101,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
tt.setup(t, &tt.fields, &tt.args)
}
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Truef(t, tt.wantAsserter(t, m.Subscribe(tt.args.in0, tt.args.subscriptionRequest)), "Subscribe(%v, %v)", tt.args.in0, tt.args.subscriptionRequest)
})
@@ -133,8 +111,8 @@ func TestSubscriber_Subscribe(t *testing.T) {
func TestSubscriber_Unsubscribe(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
ctx context.Context
@@ -158,8 +136,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Truef(t, tt.wantAsserter(t, m.Unsubscribe(tt.args.ctx, tt.args.unsubscriptionRequest)), "Unsubscribe(%v, %v)", tt.args.ctx, tt.args.unsubscriptionRequest)
})
@@ -168,8 +146,8 @@ func TestSubscriber_Unsubscribe(t *testing.T) {
func TestSubscriber_handleMonitoredMMI(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
calReply model.CALReply
@@ -249,8 +227,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Equalf(t, tt.want, m.handleMonitoredMMI(tt.args.calReply), "handleMonitoredMMI(%v)", tt.args.calReply)
})
@@ -259,8 +237,8 @@ func TestSubscriber_handleMonitoredMMI(t *testing.T) {
func TestSubscriber_offerMMI(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
unitAddressString string
@@ -501,8 +479,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Equalf(t, tt.want, m.offerMMI(tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerMMI(%v,\n%v\n, \n%v\n, func())", tt.args.unitAddressString, tt.args.calData, tt.args.subscriptionHandle)
})
@@ -511,8 +489,8 @@ func TestSubscriber_offerMMI(t *testing.T) {
func TestSubscriber_handleMonitoredSAL(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
sal model.MonitoredSAL
@@ -546,8 +524,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Equalf(t, tt.want, m.handleMonitoredSAL(tt.args.sal), "handleMonitoredSAL(%v)", tt.args.sal)
})
@@ -556,8 +534,8 @@ func TestSubscriber_handleMonitoredSAL(t *testing.T) {
func TestSubscriber_offerSAL(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
sal model.MonitoredSAL
@@ -1368,8 +1346,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.Equalf(t, tt.want, m.offerSAL(tt.args.sal, tt.args.subscriptionHandle, tt.args.consumerProvider(t)), "offerSAL(\n%v\n, \n%v\n)", tt.args.sal, tt.args.subscriptionHandle)
})
@@ -1378,8 +1356,8 @@ func TestSubscriber_offerSAL(t *testing.T) {
func TestSubscriber_Register(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
consumer apiModel.PlcSubscriptionEventConsumer
@@ -1400,8 +1378,8 @@ func TestSubscriber_Register(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
assert.NotNilf(t, m.Register(tt.args.consumer, tt.args.handles), "Register(func(), %v)", tt.args.handles)
})
@@ -1410,8 +1388,8 @@ func TestSubscriber_Register(t *testing.T) {
func TestSubscriber_Unregister(t *testing.T) {
type fields struct {
- connection *Connection
- consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
+ addSubscriber func(subscriber *Subscriber)
+ consumers map[*spiModel.DefaultPlcConsumerRegistration]apiModel.PlcSubscriptionEventConsumer
}
type args struct {
registration apiModel.PlcConsumerRegistration
@@ -1431,8 +1409,8 @@ func TestSubscriber_Unregister(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Subscriber{
- connection: tt.fields.connection,
- consumers: tt.fields.consumers,
+ addSubscriber: tt.fields.addSubscriber,
+ consumers: tt.fields.consumers,
}
m.Unregister(tt.args.registration)
})
[plc4x] 01/04: refactor(plc4go/spi): split up request transaction into separate file
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit d7d54912ee102858bd60a28aaea08713a6b32bcb
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:29:06 2023 +0200
refactor(plc4go/spi): split up request transaction into separate file
---
plc4go/spi/transactions/RequestTransaction.go | 148 ++++++++++
.../spi/transactions/RequestTransactionManager.go | 113 --------
.../transactions/RequestTransactionManager_test.go | 288 -------------------
plc4go/spi/transactions/RequestTransaction_test.go | 319 +++++++++++++++++++++
plc4go/spi/transactions/completedFuture.go | 34 +++
5 files changed, 501 insertions(+), 401 deletions(-)
diff --git a/plc4go/spi/transactions/RequestTransaction.go b/plc4go/spi/transactions/RequestTransaction.go
new file mode 100644
index 0000000000..de8c5bed84
--- /dev/null
+++ b/plc4go/spi/transactions/RequestTransaction.go
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+ "context"
+ "fmt"
+ "github.com/apache/plc4x/plc4go/spi/pool"
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
+ "github.com/rs/zerolog/log"
+ "sync"
+ "time"
+)
+
+// RequestTransaction represents a transaction
+type RequestTransaction interface {
+ fmt.Stringer
+ // FailRequest signals that this transaction has failed
+ FailRequest(err error) error
+ // EndRequest signals that this transaction is done
+ EndRequest() error
+ // Submit submits a RequestTransactionRunnable to the RequestTransactionManager
+ Submit(operation RequestTransactionRunnable)
+ // AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
+ AwaitCompletion(ctx context.Context) error
+ // IsCompleted indicates that the that this RequestTransaction is completed
+ IsCompleted() bool
+}
+
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
+type requestTransaction struct {
+ parent *requestTransactionManager
+ transactionId int32
+
+ /** The initial operation to perform to kick off the request */
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+
+ stateChangeMutex sync.Mutex
+ completed bool
+
+ transactionLog zerolog.Logger
+}
+
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
+
+func (t *requestTransaction) FailRequest(err error) error {
+ t.stateChangeMutex.Lock()
+ defer t.stateChangeMutex.Unlock()
+ if t.completed {
+ return errors.Wrap(err, "calling fail on a already completed transaction")
+ }
+ t.transactionLog.Trace().Msg("Fail the request")
+ t.completed = true
+ return t.parent.failRequest(t, err)
+}
+
+func (t *requestTransaction) EndRequest() error {
+ t.stateChangeMutex.Lock()
+ defer t.stateChangeMutex.Unlock()
+ if t.completed {
+ return errors.New("calling end on a already completed transaction")
+ }
+ t.transactionLog.Trace().Msg("Ending the request")
+ t.completed = true
+ // Remove it from Running Requests
+ return t.parent.endRequest(t)
+}
+
+func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
+ t.stateChangeMutex.Lock()
+ defer t.stateChangeMutex.Unlock()
+ if t.completed {
+ t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
+ return
+ }
+ if t.operation != nil {
+ t.transactionLog.Warn().Msg("Operation already set")
+ }
+ t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
+ t.operation = func() {
+ t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
+ operation(t)
+ t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
+ }
+ t.parent.submitTransaction(t)
+}
+
+func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
+ timeout, cancelFunc := context.WithTimeout(ctx, time.Minute*30) // This is intentionally set very high
+ defer cancelFunc()
+ for t.completionFuture == nil {
+ time.Sleep(time.Millisecond * 10)
+ if err := timeout.Err(); err != nil {
+ log.Error().Msg("Timout after a long time. This means something is very of here")
+ return errors.Wrap(err, "Error waiting for completion future to be set")
+ }
+ }
+ if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
+ return err
+ }
+ stillActive := true
+ for stillActive {
+ stillActive = false
+ for _, runningRequest := range t.parent.runningRequests {
+ if runningRequest.transactionId == t.transactionId {
+ stillActive = true
+ break
+ }
+ }
+ }
+ return nil
+}
+
+func (t *requestTransaction) IsCompleted() bool {
+ return t.completed
+}
+
+func (t *requestTransaction) String() string {
+ return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
+}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index c06e0de71e..8a93dc6f87 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -22,7 +22,6 @@ package transactions
import (
"container/list"
"context"
- "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"io"
@@ -46,21 +45,6 @@ func init() {
type RequestTransactionRunnable func(RequestTransaction)
-// RequestTransaction represents a transaction
-type RequestTransaction interface {
- fmt.Stringer
- // FailRequest signals that this transaction has failed
- FailRequest(err error) error
- // EndRequest signals that this transaction is done
- EndRequest() error
- // Submit submits a RequestTransactionRunnable to the RequestTransactionManager
- Submit(operation RequestTransactionRunnable)
- // AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
- AwaitCompletion(ctx context.Context) error
- // IsCompleted indicates that the that this RequestTransaction is completed
- IsCompleted() bool
-}
-
// RequestTransactionManager handles transactions
type RequestTransactionManager interface {
io.Closer
@@ -109,20 +93,6 @@ type withCustomExecutor struct {
executor pool.Executor
}
-type requestTransaction struct {
- parent *requestTransactionManager
- transactionId int32
-
- /** The initial operation to perform to kick off the request */
- operation pool.Runnable
- completionFuture pool.CompletionFuture
-
- stateChangeMutex sync.Mutex
- completed bool
-
- transactionLog zerolog.Logger
-}
-
type requestTransactionManager struct {
runningRequests []*requestTransaction
// How many transactions are allowed to run at the same time?
@@ -189,18 +159,6 @@ func (r *requestTransactionManager) processWorklog() {
}
}
-type completedFuture struct {
- err error
-}
-
-func (c completedFuture) AwaitCompletion(_ context.Context) error {
- return c.err
-}
-
-func (completedFuture) Cancel(_ bool, _ error) {
- // No op
-}
-
func (r *requestTransactionManager) StartTransaction() RequestTransaction {
r.transactionMutex.Lock()
defer r.transactionMutex.Unlock()
@@ -288,74 +246,3 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
r.runningRequests = nil
return r.executor.Close()
}
-
-func (t *requestTransaction) FailRequest(err error) error {
- t.stateChangeMutex.Lock()
- defer t.stateChangeMutex.Unlock()
- if t.completed {
- return errors.Wrap(err, "calling fail on a already completed transaction")
- }
- t.transactionLog.Trace().Msg("Fail the request")
- t.completed = true
- return t.parent.failRequest(t, err)
-}
-
-func (t *requestTransaction) EndRequest() error {
- t.stateChangeMutex.Lock()
- defer t.stateChangeMutex.Unlock()
- if t.completed {
- return errors.New("calling end on a already completed transaction")
- }
- t.transactionLog.Trace().Msg("Ending the request")
- t.completed = true
- // Remove it from Running Requests
- return t.parent.endRequest(t)
-}
-
-func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
- t.stateChangeMutex.Lock()
- defer t.stateChangeMutex.Unlock()
- if t.completed {
- t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
- return
- }
- if t.operation != nil {
- t.transactionLog.Warn().Msg("Operation already set")
- }
- t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
- t.operation = func() {
- t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
- operation(t)
- t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
- }
- t.parent.submitTransaction(t)
-}
-
-func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
- for t.completionFuture == nil {
- time.Sleep(time.Millisecond * 10)
- // TODO: this should timeout and not loop infinite...
- }
- if err := t.completionFuture.AwaitCompletion(ctx); err != nil {
- return err
- }
- stillActive := true
- for stillActive {
- stillActive = false
- for _, runningRequest := range t.parent.runningRequests {
- if runningRequest.transactionId == t.transactionId {
- stillActive = true
- break
- }
- }
- }
- return nil
-}
-
-func (t *requestTransaction) IsCompleted() bool {
- return t.completed
-}
-
-func (t *requestTransaction) String() string {
- return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
-}
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 7384f87871..4445edad1e 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -26,10 +26,8 @@ import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/testutils"
- "github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
"testing"
"time"
)
@@ -453,292 +451,6 @@ func Test_requestTransactionManager_submitTransaction(t *testing.T) {
}
}
-func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
- type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- }
- type args struct {
- ctx context.Context
- }
- tests := []struct {
- name string
- fields fields
- args args
- mockSetup func(t *testing.T, fields *fields, args *args)
- wantErr bool
- }{
- {
- name: "just wait",
- fields: fields{
- parent: &requestTransactionManager{
- runningRequests: []*requestTransaction{
- func() *requestTransaction {
- r := &requestTransaction{}
- go func() {
- time.Sleep(100 * time.Millisecond)
- // We fake an ending transaction like that
- r.transactionId = 1
- }()
- return r
- }(),
- },
- },
- },
- args: args{
- ctx: func() context.Context {
- ctx, cancelFunc := context.WithCancel(context.Background())
- cancelFunc()
- return ctx
- }(),
- },
- mockSetup: func(t *testing.T, fields *fields, args *args) {
- completionFuture := NewMockCompletionFuture(t)
- expect := completionFuture.EXPECT()
- expect.AwaitCompletion(mock.Anything).Return(nil)
- fields.completionFuture = completionFuture
- },
- },
- }
- for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- if tt.mockSetup != nil {
- tt.mockSetup(t1, &tt.fields, &tt.args)
- }
- t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- }
- if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
- t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
-func Test_requestTransaction_EndRequest(t1 *testing.T) {
- type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- completed bool
- }
- tests := []struct {
- name string
- fields fields
- wantErr bool
- }{
- {
- name: "just end it",
- fields: fields{
- parent: &requestTransactionManager{},
- },
- wantErr: true,
- },
- {
- name: "end it completed",
- fields: fields{
- parent: &requestTransactionManager{},
- completed: true,
- },
- wantErr: true,
- },
- }
- for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- completed: tt.fields.completed,
- }
- if err := t.EndRequest(); (err != nil) != tt.wantErr {
- t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
-
-func Test_requestTransaction_FailRequest(t1 *testing.T) {
- type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- completed bool
- }
- type args struct {
- err error
- }
- tests := []struct {
- name string
- fields fields
- args args
- mockSetup func(t *testing.T, fields *fields, args *args)
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "just fail it",
- fields: fields{
- parent: &requestTransactionManager{},
- },
- mockSetup: func(t *testing.T, fields *fields, args *args) {
- completionFuture := NewMockCompletionFuture(t)
- expect := completionFuture.EXPECT()
- expect.Cancel(true, nil).Return()
- fields.completionFuture = completionFuture
- },
- wantErr: assert.Error,
- },
- {
- name: "just fail it (completed)",
- args: args{
- err: errors.New("nope"),
- },
- fields: fields{
- parent: &requestTransactionManager{},
- completed: true,
- },
- wantErr: assert.Error,
- },
- }
- for _, tt := range tests {
- t1.Run(tt.name, func(t *testing.T) {
- if tt.mockSetup != nil {
- tt.mockSetup(t, &tt.fields, &tt.args)
- }
- r := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- completed: tt.fields.completed,
- }
- tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
- })
- }
-}
-
-func Test_requestTransaction_String(t1 *testing.T) {
- type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- }
- tests := []struct {
- name string
- fields fields
- want string
- }{
- {
- name: "give a string",
- want: "Transaction{tid:0}",
- },
- }
- for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- }
- if got := t.String(); got != tt.want {
- t1.Errorf("String() = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
-func Test_requestTransaction_Submit(t1 *testing.T) {
- type fields struct {
- parent *requestTransactionManager
- transactionId int32
- operation pool.Runnable
- completionFuture pool.CompletionFuture
- transactionLog zerolog.Logger
- completed bool
- }
- type args struct {
- operation RequestTransactionRunnable
- }
- tests := []struct {
- name string
- fields fields
- args args
- }{
- {
- name: "submit something",
- fields: fields{
- parent: &requestTransactionManager{},
- },
- args: args{
- operation: func(_ RequestTransaction) {
- // NOOP
- },
- },
- },
- {
- name: "submit something again",
- fields: fields{
- parent: &requestTransactionManager{},
- operation: func() {
- // NOOP
- },
- },
- args: args{
- operation: func(_ RequestTransaction) {
- // NOOP
- },
- },
- },
- {
- name: "submit completed",
- fields: fields{
- parent: &requestTransactionManager{},
- operation: func() {
- // NOOP
- },
- completed: true,
- },
- args: args{
- operation: func(_ RequestTransaction) {
- // NOOP
- },
- },
- },
- }
- for _, tt := range tests {
- t1.Run(tt.name, func(t1 *testing.T) {
- t := &requestTransaction{
- parent: tt.fields.parent,
- transactionId: tt.fields.transactionId,
- operation: tt.fields.operation,
- completionFuture: tt.fields.completionFuture,
- transactionLog: tt.fields.transactionLog,
- completed: tt.fields.completed,
- }
- t.Submit(tt.args.operation)
- t.operation()
- })
- }
-}
-
func Test_requestTransactionManager_Close(t *testing.T) {
type fields struct {
runningRequests []*requestTransaction
diff --git a/plc4go/spi/transactions/RequestTransaction_test.go b/plc4go/spi/transactions/RequestTransaction_test.go
new file mode 100644
index 0000000000..5dad61f345
--- /dev/null
+++ b/plc4go/spi/transactions/RequestTransaction_test.go
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/apache/plc4x/plc4go/spi/pool"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+)
+
+func Test_requestTransaction_EndRequest(t1 *testing.T) {
+ type fields struct {
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+ transactionLog zerolog.Logger
+ completed bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ wantErr bool
+ }{
+ {
+ name: "just end it",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ },
+ wantErr: true,
+ },
+ {
+ name: "end it completed",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ completed: true,
+ },
+ wantErr: true,
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ completionFuture: tt.fields.completionFuture,
+ transactionLog: tt.fields.transactionLog,
+ completed: tt.fields.completed,
+ }
+ if err := t.EndRequest(); (err != nil) != tt.wantErr {
+ t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
+
+func Test_requestTransaction_FailRequest(t1 *testing.T) {
+ type fields struct {
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+ transactionLog zerolog.Logger
+ completed bool
+ }
+ type args struct {
+ err error
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ mockSetup func(t *testing.T, fields *fields, args *args)
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "just fail it",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ },
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ completionFuture := NewMockCompletionFuture(t)
+ expect := completionFuture.EXPECT()
+ expect.Cancel(true, nil).Return()
+ fields.completionFuture = completionFuture
+ },
+ wantErr: assert.Error,
+ },
+ {
+ name: "just fail it (completed)",
+ args: args{
+ err: errors.New("nope"),
+ },
+ fields: fields{
+ parent: &requestTransactionManager{},
+ completed: true,
+ },
+ wantErr: assert.Error,
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t *testing.T) {
+ if tt.mockSetup != nil {
+ tt.mockSetup(t, &tt.fields, &tt.args)
+ }
+ r := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ completionFuture: tt.fields.completionFuture,
+ transactionLog: tt.fields.transactionLog,
+ completed: tt.fields.completed,
+ }
+ tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
+ })
+ }
+}
+
+func Test_requestTransaction_String(t1 *testing.T) {
+ type fields struct {
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+ transactionLog zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "give a string",
+ want: "Transaction{tid:0}",
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ completionFuture: tt.fields.completionFuture,
+ transactionLog: tt.fields.transactionLog,
+ }
+ if got := t.String(); got != tt.want {
+ t1.Errorf("String() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func Test_requestTransaction_Submit(t1 *testing.T) {
+ type fields struct {
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+ transactionLog zerolog.Logger
+ completed bool
+ }
+ type args struct {
+ operation RequestTransactionRunnable
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ }{
+ {
+ name: "submit something",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ },
+ args: args{
+ operation: func(_ RequestTransaction) {
+ // NOOP
+ },
+ },
+ },
+ {
+ name: "submit something again",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ operation: func() {
+ // NOOP
+ },
+ },
+ args: args{
+ operation: func(_ RequestTransaction) {
+ // NOOP
+ },
+ },
+ },
+ {
+ name: "submit completed",
+ fields: fields{
+ parent: &requestTransactionManager{},
+ operation: func() {
+ // NOOP
+ },
+ completed: true,
+ },
+ args: args{
+ operation: func(_ RequestTransaction) {
+ // NOOP
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ t := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ completionFuture: tt.fields.completionFuture,
+ transactionLog: tt.fields.transactionLog,
+ completed: tt.fields.completed,
+ }
+ t.Submit(tt.args.operation)
+ t.operation()
+ })
+ }
+}
+
+func Test_requestTransaction_AwaitCompletion(t1 *testing.T) {
+ type fields struct {
+ parent *requestTransactionManager
+ transactionId int32
+ operation pool.Runnable
+ completionFuture pool.CompletionFuture
+ transactionLog zerolog.Logger
+ }
+ type args struct {
+ ctx context.Context
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ mockSetup func(t *testing.T, fields *fields, args *args)
+ wantErr bool
+ }{
+ {
+ name: "just wait",
+ fields: fields{
+ parent: &requestTransactionManager{
+ runningRequests: []*requestTransaction{
+ func() *requestTransaction {
+ r := &requestTransaction{}
+ go func() {
+ time.Sleep(100 * time.Millisecond)
+ // We fake an ending transaction like that
+ r.transactionId = 1
+ }()
+ return r
+ }(),
+ },
+ },
+ },
+ args: args{
+ ctx: func() context.Context {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ cancelFunc()
+ return ctx
+ }(),
+ },
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ completionFuture := NewMockCompletionFuture(t)
+ expect := completionFuture.EXPECT()
+ expect.AwaitCompletion(mock.Anything).Return(nil)
+ fields.completionFuture = completionFuture
+ },
+ },
+ }
+ for _, tt := range tests {
+ t1.Run(tt.name, func(t1 *testing.T) {
+ if tt.mockSetup != nil {
+ tt.mockSetup(t1, &tt.fields, &tt.args)
+ }
+ t := &requestTransaction{
+ parent: tt.fields.parent,
+ transactionId: tt.fields.transactionId,
+ operation: tt.fields.operation,
+ completionFuture: tt.fields.completionFuture,
+ transactionLog: tt.fields.transactionLog,
+ }
+ if err := t.AwaitCompletion(tt.args.ctx); (err != nil) != tt.wantErr {
+ t1.Errorf("AwaitCompletion() error = %v, wantErr %v", err, tt.wantErr)
+ }
+ })
+ }
+}
diff --git a/plc4go/spi/transactions/completedFuture.go b/plc4go/spi/transactions/completedFuture.go
new file mode 100644
index 0000000000..ac8227f579
--- /dev/null
+++ b/plc4go/spi/transactions/completedFuture.go
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package transactions
+
+import "context"
+
+type completedFuture struct {
+ err error
+}
+
+func (c completedFuture) AwaitCompletion(_ context.Context) error {
+ return c.err
+}
+
+func (completedFuture) Cancel(_ bool, _ error) {
+ // No op
+}
[plc4x] 02/04: refactor(plc4go/spi): split up pool into multiple files
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit ade510700aab3ef7200064501bd2f01152919aa9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jun 13 12:44:07 2023 +0200
refactor(plc4go/spi): split up pool into multiple files
---
plc4go/spi/pool/CompletionFuture.go | 90 ++++
plc4go/spi/pool/CompletionFuture_test.go | 191 +++++++++
plc4go/spi/pool/WorkerPool.go | 393 +-----------------
plc4go/spi/pool/WorkerPool_test.go | 689 -------------------------------
plc4go/spi/pool/dynamicExecutor.go | 173 ++++++++
plc4go/spi/pool/dynamicExecutor_test.go | 170 ++++++++
plc4go/spi/pool/executor.go | 129 ++++++
plc4go/spi/pool/executor_test.go | 413 ++++++++++++++++++
plc4go/spi/pool/workItem.go | 32 ++
plc4go/spi/pool/workItem_test.go | 53 +++
plc4go/spi/pool/worker.go | 96 +++++
plc4go/spi/pool/worker_test.go | 227 ++++++++++
12 files changed, 1585 insertions(+), 1071 deletions(-)
diff --git a/plc4go/spi/pool/CompletionFuture.go b/plc4go/spi/pool/CompletionFuture.go
new file mode 100644
index 0000000000..5d11002356
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture.go
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+)
+
+type CompletionFuture interface {
+ AwaitCompletion(ctx context.Context) error
+ Cancel(interrupt bool, err error)
+}
+
+type future struct {
+ cancelRequested atomic.Bool
+ interruptRequested atomic.Bool
+ completed atomic.Bool
+ errored atomic.Bool
+ err atomic.Value
+}
+
+func (f *future) Cancel(interrupt bool, err error) {
+ f.cancelRequested.Store(true)
+ f.interruptRequested.Store(interrupt)
+ if err != nil {
+ f.errored.Store(true)
+ f.err.Store(err)
+ }
+}
+
+func (f *future) complete() {
+ f.completed.Store(true)
+}
+
+// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
+var Canceled = errors.New("Canceled")
+
+func (f *future) AwaitCompletion(ctx context.Context) error {
+ for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
+ time.Sleep(time.Millisecond * 10)
+ }
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if err, ok := f.err.Load().(error); ok {
+ return err
+ }
+ if f.cancelRequested.Load() {
+ return Canceled
+ }
+ return nil
+}
+
+func (f *future) String() string {
+ return fmt.Sprintf("future{\n"+
+ "\tcancelRequested: %t,\n"+
+ "\tinterruptRequested: %t,\n"+
+ "\tcompleted: %t,\n"+
+ "\terrored: %t,\n"+
+ "\terr: %v,\n"+
+ "}",
+ f.cancelRequested.Load(),
+ f.interruptRequested.Load(),
+ f.completed.Load(),
+ f.errored.Load(),
+ f.err.Load(),
+ )
+}
diff --git a/plc4go/spi/pool/CompletionFuture_test.go b/plc4go/spi/pool/CompletionFuture_test.go
new file mode 100644
index 0000000000..79fdaeace7
--- /dev/null
+++ b/plc4go/spi/pool/CompletionFuture_test.go
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ "time"
+)
+
+func Test_future_AwaitCompletion(t *testing.T) {
+ type args struct {
+ ctx context.Context
+ }
+ tests := []struct {
+ name string
+ args args
+ completer func(*future)
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "completes with error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ f.Cancel(false, errors.New("Uh oh"))
+ },
+ wantErr: assert.Error,
+ },
+ {
+ name: "completes regular",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 30)
+ f.complete()
+ },
+ wantErr: assert.NoError,
+ },
+ {
+ name: "completes not int time",
+ args: args{ctx: func() context.Context {
+ deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
+ t.Cleanup(cancel)
+ return deadline
+ }()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ },
+ wantErr: assert.Error,
+ },
+ {
+ name: "completes canceled without error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ f.Cancel(true, nil)
+ },
+ wantErr: func(t assert.TestingT, err error, i ...any) bool {
+ assert.Same(t, Canceled, err)
+ return true
+ },
+ },
+ {
+ name: "completes canceled with particular error",
+ args: args{ctx: context.TODO()},
+ completer: func(f *future) {
+ time.Sleep(time.Millisecond * 300)
+ f.Cancel(true, errors.New("Uh oh"))
+ },
+ wantErr: func(t assert.TestingT, err error, i ...any) bool {
+ assert.Equal(t, "Uh oh", err.Error())
+ return true
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ go tt.completer(f)
+ tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
+ })
+ }
+}
+
+func Test_future_Cancel(t *testing.T) {
+ type args struct {
+ interrupt bool
+ err error
+ }
+ tests := []struct {
+ name string
+ args args
+ verifier func(*testing.T, *future)
+ }{
+ {
+ name: "cancel cancels",
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ },
+ },
+ {
+ name: "cancel with interrupt",
+ args: args{
+ interrupt: true,
+ err: nil,
+ },
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ assert.False(t, f.errored.Load())
+ assert.Nil(t, f.err.Load())
+ },
+ },
+ {
+ name: "cancel with err",
+ args: args{
+ interrupt: true,
+ err: errors.New("Uh Oh"),
+ },
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.cancelRequested.Load())
+ assert.True(t, f.errored.Load())
+ assert.NotNil(t, f.err.Load())
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ f.Cancel(tt.args.interrupt, tt.args.err)
+ tt.verifier(t, f)
+ })
+ }
+}
+
+func Test_future_String(t *testing.T) {
+ tests := []struct {
+ name string
+ want string
+ }{
+ {
+ name: "string it",
+ want: "future{\n\tcancelRequested: false,\n\tinterruptRequested: false,\n\tcompleted: false,\n\terrored: false,\n\terr: <nil>,\n}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ assert.Equalf(t, tt.want, f.String(), "String()")
+ })
+ }
+}
+
+func Test_future_complete(t *testing.T) {
+ tests := []struct {
+ name string
+ verifier func(*testing.T, *future)
+ }{
+ {
+ name: "complete completes",
+ verifier: func(t *testing.T, f *future) {
+ assert.True(t, f.completed.Load())
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f := &future{}
+ f.complete()
+ tt.verifier(t, f)
+ })
+ }
+}
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 85f3ef8064..d83e770dc9 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -21,97 +21,13 @@ package pool
import (
"context"
- "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/apache/plc4x/plc4go/spi/utils"
- "github.com/pkg/errors"
- "github.com/rs/zerolog"
"io"
- "runtime/debug"
- "sync"
- "sync/atomic"
"time"
)
type Runnable func()
-type worker struct {
- id int
- shutdown atomic.Bool
- interrupted atomic.Bool
- interrupter chan struct{}
- executor interface {
- isTraceWorkers() bool
- getWorksItems() chan workItem
- getWorkerWaitGroup() *sync.WaitGroup
- }
- hasEnded atomic.Bool
- lastReceived time.Time
-
- log zerolog.Logger
-}
-
-func (w *worker) initialize() {
- w.shutdown.Store(false)
- w.interrupted.Store(false)
- w.interrupter = make(chan struct{}, 1)
- w.hasEnded.Store(false)
- w.lastReceived = time.Now()
-}
-
-func (w *worker) work() {
- w.executor.getWorkerWaitGroup().Add(1)
- defer w.executor.getWorkerWaitGroup().Done()
- defer func() {
- if err := recover(); err != nil {
- w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- if !w.shutdown.Load() {
- // if we are not in shutdown we continue
- w.work()
- }
- }()
- workerLog := w.log.With().Int("Worker id", w.id).Logger()
- if !w.executor.isTraceWorkers() {
- workerLog = zerolog.Nop()
- }
- workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
- w.hasEnded.Store(false)
- workerLog.Debug().Msgf("setting to not ended")
-
- for !w.shutdown.Load() {
- workerLog.Debug().Msg("Working")
- select {
- case _workItem := <-w.executor.getWorksItems():
- w.lastReceived = time.Now()
- workerLog.Debug().Msgf("Got work item %v", _workItem)
- if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
- workerLog.Debug().Msg("We need to stop")
- // TODO: do we need to complete with a error?
- } else {
- workerLog.Debug().Msgf("Running work item %v", _workItem)
- _workItem.runnable()
- _workItem.completionFuture.complete()
- workerLog.Debug().Msgf("work item %v completed", _workItem)
- }
- case <-w.interrupter:
- workerLog.Debug().Msg("We got interrupted")
- }
- }
- w.hasEnded.Store(true)
- workerLog.Debug().Msg("setting to ended")
-}
-
-type workItem struct {
- workItemId int32
- runnable Runnable
- completionFuture *future
-}
-
-func (w workItem) String() string {
- return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
-}
-
type Executor interface {
io.Closer
Start()
@@ -120,43 +36,6 @@ type Executor interface {
IsRunning() bool
}
-type executor struct {
- running bool
- shutdown bool
- stateChange sync.Mutex
- worker []*worker
- queueDepth int
- workItems chan workItem
- traceWorkers bool
-
- workerWaitGroup sync.WaitGroup
-
- log zerolog.Logger
-}
-
-func (e *executor) isTraceWorkers() bool {
- return e.traceWorkers
-}
-
-func (e *executor) getWorksItems() chan workItem {
- return e.workItems
-}
-
-func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
- return &e.workerWaitGroup
-}
-
-type dynamicExecutor struct {
- *executor
-
- maxNumberOfWorkers int
- currentNumberOfWorkers atomic.Int32
- dynamicStateChange sync.Mutex
- interrupter chan struct{}
-
- dynamicWorkers sync.WaitGroup
-}
-
func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
workers := make([]*worker, numberOfWorkers)
customLogger := options.ExtractCustomLogger(_options...)
@@ -184,10 +63,6 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
return _executor
}
-var upScaleInterval = 100 * time.Millisecond
-var downScaleInterval = 5 * time.Second
-var timeToBecomeUnused = 5 * time.Second
-
func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
customLogger := options.ExtractCustomLogger(_options...)
_executor := &dynamicExecutor{
@@ -219,265 +94,19 @@ func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
return &tracerWorkersOption{traceWorkers: traceWorkers}
}
+///////////////////////////////////////
+///////////////////////////////////////
+//
+// Internal section
+//
+
type tracerWorkersOption struct {
options.Option
traceWorkers bool
}
-func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
- if runnable == nil {
- value := atomic.Value{}
- value.Store(errors.New("runnable must not be nil"))
- return &future{err: value}
- }
- e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
- completionFuture := &future{}
- if e.shutdown {
- completionFuture.Cancel(false, errors.New("executor in shutdown"))
- return completionFuture
- }
- select {
- case e.workItems <- workItem{
- workItemId: workItemId,
- runnable: runnable,
- completionFuture: completionFuture,
- }:
- e.log.Trace().Msg("Item added")
- case <-ctx.Done():
- completionFuture.Cancel(false, ctx.Err())
- }
-
- e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
- return completionFuture
-}
-
-func (e *executor) Start() {
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if e.running || e.shutdown {
- e.log.Warn().Msg("Already started")
- return
- }
- e.running = true
- e.shutdown = false
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.initialize()
- go worker.work()
- }
-}
-
-func (e *dynamicExecutor) Start() {
- e.dynamicStateChange.Lock()
- defer e.dynamicStateChange.Unlock()
- if e.running || e.shutdown {
- e.log.Warn().Msg("Already started")
- return
- }
- if e.interrupter != nil {
- e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
- close(e.interrupter)
- e.dynamicWorkers.Wait()
- }
-
- e.executor.Start()
- mutex := sync.Mutex{}
- e.interrupter = make(chan struct{})
- // Worker spawner
- go func() {
- e.dynamicWorkers.Add(1)
- defer e.dynamicWorkers.Done()
- defer func() {
- if err := recover(); err != nil {
- e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- }()
- workerLog := e.log.With().Str("Worker type", "spawner").Logger()
- if !e.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for e.running && !e.shutdown {
- workerLog.Trace().Msg("running")
- mutex.Lock()
- numberOfItemsInQueue := len(e.workItems)
- numberOfWorkers := len(e.worker)
- workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
- if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
- workerLog.Trace().Msg("spawning new worker")
- _worker := &worker{
- id: numberOfWorkers - 1,
- interrupter: make(chan struct{}, 1),
- executor: e,
- lastReceived: time.Now(),
- log: e.log,
- }
- e.worker = append(e.worker, _worker)
- _worker.initialize()
- workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
- go _worker.work()
- e.currentNumberOfWorkers.Add(1)
- } else {
- workerLog.Trace().Msg("Nothing to scale")
- }
- mutex.Unlock()
- func() {
- workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
- timer := time.NewTimer(upScaleInterval)
- defer utils.CleanupTimer(timer)
- select {
- case <-timer.C:
- case <-e.interrupter:
- workerLog.Info().Msg("interrupted")
- }
- }()
- }
- workerLog.Info().Msg("Terminated")
- }()
- // Worker killer
- go func() {
- e.dynamicWorkers.Add(1)
- defer e.dynamicWorkers.Done()
- defer func() {
- if err := recover(); err != nil {
- e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
- }
- }()
- workerLog := e.log.With().Str("Worker type", "killer").Logger()
- if !e.traceWorkers {
- workerLog = zerolog.Nop()
- }
- for e.running && !e.shutdown {
- workerLog.Trace().Msg("running")
- mutex.Lock()
- newWorkers := make([]*worker, 0)
- for _, _worker := range e.worker {
- deadline := time.Now().Add(-timeToBecomeUnused)
- workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
- if _worker.lastReceived.Before(deadline) {
- workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
- _worker.interrupted.Store(true)
- close(_worker.interrupter)
- e.currentNumberOfWorkers.Add(-1)
- } else {
- workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
- newWorkers = append(newWorkers, _worker)
- }
- }
- e.worker = newWorkers
- mutex.Unlock()
- func() {
- workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
- timer := time.NewTimer(downScaleInterval)
- defer utils.CleanupTimer(timer)
- select {
- case <-timer.C:
- case <-e.interrupter:
- workerLog.Info().Msg("interrupted")
- }
- }()
- }
- workerLog.Info().Msg("Terminated")
- }()
-}
-
-func (e *executor) Stop() {
- e.log.Trace().Msg("stopping now")
- e.stateChange.Lock()
- defer e.stateChange.Unlock()
- if !e.running || e.shutdown {
- e.log.Warn().Msg("already stopped")
- return
- }
- e.shutdown = true
- for i := 0; i < len(e.worker); i++ {
- worker := e.worker[i]
- worker.shutdown.Store(true)
- worker.interrupted.Store(true)
- close(worker.interrupter)
- }
- e.running = false
- e.shutdown = false
- e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
- e.workerWaitGroup.Wait()
- e.log.Trace().Msg("stopped")
-}
-
-func (e *dynamicExecutor) Stop() {
- e.log.Trace().Msg("stopping now")
- e.dynamicStateChange.Lock()
- defer e.dynamicStateChange.Unlock()
- if !e.running || e.shutdown {
- e.log.Warn().Msg("already stopped")
- return
- }
- close(e.interrupter)
- e.log.Trace().Msg("stopping inner executor")
- e.executor.Stop()
- e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
- e.dynamicWorkers.Wait()
- e.log.Trace().Msg("stopped")
-}
-
-func (e *executor) Close() error {
- e.Stop()
- return nil
-}
-
-func (e *executor) IsRunning() bool {
- return e.running && !e.shutdown
-}
-
-type CompletionFuture interface {
- AwaitCompletion(ctx context.Context) error
- Cancel(interrupt bool, err error)
-}
-
-type future struct {
- cancelRequested atomic.Bool
- interruptRequested atomic.Bool
- completed atomic.Bool
- errored atomic.Bool
- err atomic.Value
-}
-
-func (f *future) Cancel(interrupt bool, err error) {
- f.cancelRequested.Store(true)
- f.interruptRequested.Store(interrupt)
- if err != nil {
- f.errored.Store(true)
- f.err.Store(err)
- }
-}
-
-func (f *future) complete() {
- f.completed.Store(true)
-}
-
-// Canceled is returned on CompletionFuture.AwaitCompletion when a CompletionFuture was canceled
-var Canceled = errors.New("Canceled")
-
-func (f *future) AwaitCompletion(ctx context.Context) error {
- for !f.completed.Load() && !f.errored.Load() && !f.cancelRequested.Load() && ctx.Err() == nil {
- time.Sleep(time.Millisecond * 10)
- }
- if err := ctx.Err(); err != nil {
- return err
- }
- if err, ok := f.err.Load().(error); ok {
- return err
- }
- if f.cancelRequested.Load() {
- return Canceled
- }
- return nil
-}
-
-func (f *future) String() string {
- return fmt.Sprintf("future: cancelRequested(%t), interruptRequested(%t), completed(%t), errored(%t), err(%v)",
- f.cancelRequested.Load(),
- f.interruptRequested.Load(),
- f.completed.Load(),
- f.errored.Load(),
- f.err.Load(),
- )
-}
+//
+// Internal section
+//
+///////////////////////////////////////
+///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index 9a62a09d25..b531d94e7e 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -21,9 +21,7 @@ package pool
import (
"context"
- "fmt"
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"math/rand"
@@ -32,212 +30,6 @@ import (
"time"
)
-func TestExecutor_Start(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- tests := []struct {
- name string
- fields fields
- shouldRun bool
- }{
- {
- name: "Start fresh",
- shouldRun: true,
- },
- {
- name: "Start running",
- fields: fields{
- running: true,
- },
- shouldRun: true,
- },
- {
- name: "Start stopping",
- fields: fields{
- running: true,
- shutdown: true,
- },
- shouldRun: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Start()
- assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
- })
- }
-}
-
-func TestExecutor_Stop(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- tests := []struct {
- name string
- fields fields
- shouldRun bool
- }{
- {
- name: "Stop stopped",
- shouldRun: false,
- },
- {
- name: "Stop running",
- fields: fields{
- running: true,
- queue: make(chan workItem),
- worker: []*worker{
- func() *worker {
- w := &worker{}
- w.initialize()
- return w
- }(),
- },
- },
- shouldRun: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Stop()
- })
- }
-}
-
-func TestExecutor_Submit(t *testing.T) {
- type fields struct {
- running bool
- shutdown bool
- worker []*worker
- queue chan workItem
- traceWorkers bool
- }
- type args struct {
- workItemId int32
- runnable Runnable
- context context.Context
- }
- tests := []struct {
- name string
- fields fields
- args args
- completionFutureValidator func(t *testing.T, future CompletionFuture) bool
- waitForCompletion bool
- }{
- {
- name: "submitting nothing",
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- return assert.Error(t, completionFuture.(*future).err.Load().(error))
- },
- },
- {
- name: "submit canceled",
- fields: fields{
- queue: make(chan workItem, 0),
- },
- args: args{
- workItemId: 13,
- runnable: func() {
- // We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
- },
- context: func() context.Context {
- ctx, cancelFunc := context.WithCancel(context.Background())
- cancelFunc()
- return ctx
- }(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- err := completionFuture.(*future).err.Load().(error)
- return assert.Error(t, err)
- },
- },
- {
- name: "Submit something which doesn't complete",
- fields: fields{
- queue: make(chan workItem, 1),
- },
- args: args{
- workItemId: 13,
- runnable: func() {
- // We do something for 3 seconds
- <-time.NewTimer(3 * time.Second).C
- },
- context: context.TODO(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- completed := completionFuture.(*future).completed.Load()
- return assert.False(t, completed)
- },
- },
- {
- name: "Submit something which does complete",
- fields: func() fields {
- var executor = NewFixedSizeExecutor(1, 1).(*executor)
- return fields{
- running: executor.running,
- shutdown: executor.shutdown,
- worker: executor.worker,
- queue: executor.workItems,
- traceWorkers: true,
- }
- }(),
- args: args{
- workItemId: 13,
- runnable: func() {
- // NOOP
- },
- context: context.TODO(),
- },
- completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
- completed := completionFuture.(*future).completed.Load()
- return assert.True(t, completed)
- },
- waitForCompletion: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- e := &executor{
- running: tt.fields.running,
- shutdown: tt.fields.shutdown,
- worker: tt.fields.worker,
- workItems: tt.fields.queue,
- traceWorkers: tt.fields.traceWorkers,
- }
- e.Start()
- completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
- if tt.waitForCompletion {
- assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
- }
- assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
- })
- }
-}
-
func TestNewFixedSizeExecutor(t *testing.T) {
type args struct {
numberOfWorkers int
@@ -407,487 +199,6 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
}
}
-func TestWorkItem_String(t *testing.T) {
- type fields struct {
- workItemId int32
- runnable Runnable
- completionFuture *future
- }
- tests := []struct {
- name string
- fields fields
- want string
- }{
- {
- name: "Simple test",
- want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- w := &workItem{
- workItemId: tt.fields.workItemId,
- runnable: tt.fields.runnable,
- completionFuture: tt.fields.completionFuture,
- }
- assert.Equalf(t, tt.want, w.String(), "String()")
- })
- }
-}
-
-func TestWorker_work(t *testing.T) {
- type fields struct {
- id int
- executor *executor
- }
- tests := []struct {
- name string
- fields fields
- timeBeforeFirstValidation time.Duration
- firstValidation func(*testing.T, *worker)
- timeBeforeManipulation time.Duration
- manipulator func(*worker)
- timeBeforeSecondValidation time.Duration
- secondValidation func(*testing.T, *worker)
- }{
- {
- name: "Worker should work till shutdown (even if it panics)",
- fields: fields{
- id: 0,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- panic("Oh no what should I do???")
- },
- completionFuture: &future{},
- }
- }()
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Worker should work till shutdown",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- time.Sleep(time.Millisecond * 70)
- },
- completionFuture: &future{},
- }
- }()
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Work interrupted",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- return e
- }(),
- },
- timeBeforeFirstValidation: 50 * time.Millisecond,
- firstValidation: func(t *testing.T, w *worker) {
- assert.False(t, w.hasEnded.Load(), "should not be ended")
- },
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- timeBeforeSecondValidation: 150 * time.Millisecond,
- secondValidation: func(t *testing.T, w *worker) {
- assert.True(t, w.hasEnded.Load(), "should be ended")
- },
- },
- {
- name: "Work on canceled",
- fields: fields{
- id: 1,
- executor: func() *executor {
- e := &executor{
- workItems: make(chan workItem),
- traceWorkers: true,
- }
- go func() {
- completionFuture := &future{}
- completionFuture.cancelRequested.Store(true)
- e.workItems <- workItem{
- workItemId: 0,
- runnable: func() {
- time.Sleep(time.Millisecond * 70)
- },
- completionFuture: completionFuture,
- }
- }()
- return e
- }(),
- },
- timeBeforeManipulation: 50 * time.Millisecond,
- manipulator: func(w *worker) {
- w.shutdown.Store(true)
- w.interrupter <- struct{}{}
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- w := &worker{
- id: tt.fields.id,
- interrupter: make(chan struct{}, 1),
- executor: tt.fields.executor,
- }
- go w.work()
- if tt.firstValidation != nil {
- time.Sleep(tt.timeBeforeFirstValidation)
- t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
- tt.firstValidation(t, w)
- }
- if tt.manipulator != nil {
- time.Sleep(tt.timeBeforeManipulation)
- t.Logf("manipulator after %v", tt.timeBeforeManipulation)
- tt.manipulator(w)
- }
- if tt.secondValidation != nil {
- time.Sleep(tt.timeBeforeSecondValidation)
- t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
- tt.secondValidation(t, w)
- }
- })
- }
-}
-
-func Test_future_AwaitCompletion(t *testing.T) {
- type args struct {
- ctx context.Context
- }
- tests := []struct {
- name string
- args args
- completer func(*future)
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "completes with error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- f.Cancel(false, errors.New("Uh oh"))
- },
- wantErr: assert.Error,
- },
- {
- name: "completes regular",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 30)
- f.complete()
- },
- wantErr: assert.NoError,
- },
- {
- name: "completes not int time",
- args: args{ctx: func() context.Context {
- deadline, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Millisecond))
- t.Cleanup(cancel)
- return deadline
- }()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- },
- wantErr: assert.Error,
- },
- {
- name: "completes canceled without error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- f.Cancel(true, nil)
- },
- wantErr: func(t assert.TestingT, err error, i ...any) bool {
- assert.Same(t, Canceled, err)
- return true
- },
- },
- {
- name: "completes canceled with particular error",
- args: args{ctx: context.TODO()},
- completer: func(f *future) {
- time.Sleep(time.Millisecond * 300)
- f.Cancel(true, errors.New("Uh oh"))
- },
- wantErr: func(t assert.TestingT, err error, i ...any) bool {
- assert.Equal(t, "Uh oh", err.Error())
- return true
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- go tt.completer(f)
- tt.wantErr(t, f.AwaitCompletion(tt.args.ctx), fmt.Sprintf("AwaitCompletion(%v)", tt.args.ctx))
- })
- }
-}
-
-func Test_future_Cancel(t *testing.T) {
- type args struct {
- interrupt bool
- err error
- }
- tests := []struct {
- name string
- args args
- verifier func(*testing.T, *future)
- }{
- {
- name: "cancel cancels",
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- },
- },
- {
- name: "cancel with interrupt",
- args: args{
- interrupt: true,
- err: nil,
- },
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- assert.False(t, f.errored.Load())
- assert.Nil(t, f.err.Load())
- },
- },
- {
- name: "cancel with err",
- args: args{
- interrupt: true,
- err: errors.New("Uh Oh"),
- },
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.cancelRequested.Load())
- assert.True(t, f.errored.Load())
- assert.NotNil(t, f.err.Load())
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- f.Cancel(tt.args.interrupt, tt.args.err)
- tt.verifier(t, f)
- })
- }
-}
-
-func Test_future_complete(t *testing.T) {
- tests := []struct {
- name string
- verifier func(*testing.T, *future)
- }{
- {
- name: "complete completes",
- verifier: func(t *testing.T, f *future) {
- assert.True(t, f.completed.Load())
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- f := &future{}
- f.complete()
- tt.verifier(t, f)
- })
- }
-}
-
-func Test_dynamicExecutor_Start(t *testing.T) {
- type fields struct {
- executor *executor
- maxNumberOfWorkers int
- }
- tests := []struct {
- name string
- fields fields
- setup func(t *testing.T, fields *fields)
- startTwice bool
- }{
- {
- name: "just start",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "start twice",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- startTwice: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if tt.setup != nil {
- tt.setup(t, &tt.fields)
- }
- e := &dynamicExecutor{
- executor: tt.fields.executor,
- maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
- }
- e.Start()
- if tt.startTwice {
- e.Start()
- }
- // Let it work a bit
- time.Sleep(20 * time.Millisecond)
- t.Log("done with test")
- t.Cleanup(e.Stop)
- })
- }
-}
-
-func Test_dynamicExecutor_Stop(t *testing.T) {
- type fields struct {
- executor *executor
- maxNumberOfWorkers int
- interrupter chan struct{}
- }
- tests := []struct {
- name string
- fields fields
- setup func(t *testing.T, fields *fields)
- startIt bool
- stopTwice bool
- }{
- {
- name: "just stop",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "stop started",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- },
- {
- name: "stop twice",
- fields: fields{
- executor: &executor{
- workItems: make(chan workItem, 1),
- worker: make([]*worker, 0),
- traceWorkers: true,
- },
- maxNumberOfWorkers: 100,
- },
- setup: func(t *testing.T, fields *fields) {
- fields.executor.log = produceTestLogger(t)
- fields.executor.workItems <- workItem{1, func() {}, &future{}}
- },
- stopTwice: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- if tt.setup != nil {
- tt.setup(t, &tt.fields)
- }
- e := &dynamicExecutor{
- executor: tt.fields.executor,
- maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
- interrupter: tt.fields.interrupter,
- }
- if tt.startIt {
- e.Start()
- }
- e.Stop()
- if tt.stopTwice {
- e.Stop()
- }
- })
- }
-}
-
// from: https://github.com/golang/go/issues/36532#issuecomment-575535452
func testContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
new file mode 100644
index 0000000000..22f8d20e5b
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/rs/zerolog"
+ "runtime/debug"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+var upScaleInterval = 100 * time.Millisecond
+var downScaleInterval = 5 * time.Second
+var timeToBecomeUnused = 5 * time.Second
+
+type dynamicExecutor struct {
+ *executor
+
+ maxNumberOfWorkers int
+ currentNumberOfWorkers atomic.Int32
+ dynamicStateChange sync.Mutex
+ interrupter chan struct{}
+
+ dynamicWorkers sync.WaitGroup
+}
+
+func (e *dynamicExecutor) Start() {
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ if e.interrupter != nil {
+ e.log.Debug().Msg("Ensuring that the old spawner/killers are not running")
+ close(e.interrupter)
+ e.dynamicWorkers.Wait()
+ }
+
+ e.executor.Start()
+ mutex := sync.Mutex{}
+ e.interrupter = make(chan struct{})
+ // Worker spawner
+ go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "spawner").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Trace().Msg("running")
+ mutex.Lock()
+ numberOfItemsInQueue := len(e.workItems)
+ numberOfWorkers := len(e.worker)
+ workerLog.Debug().Msgf("Checking if numberOfItemsInQueue(%d) > numberOfWorkers(%d) && numberOfWorkers(%d) < maxNumberOfWorkers(%d)", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, e.maxNumberOfWorkers)
+ if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
+ workerLog.Trace().Msg("spawning new worker")
+ _worker := &worker{
+ id: numberOfWorkers - 1,
+ interrupter: make(chan struct{}, 1),
+ executor: e,
+ lastReceived: time.Now(),
+ log: e.log,
+ }
+ e.worker = append(e.worker, _worker)
+ _worker.initialize()
+ workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
+ go _worker.work()
+ e.currentNumberOfWorkers.Add(1)
+ } else {
+ workerLog.Trace().Msg("Nothing to scale")
+ }
+ mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
+ timer := time.NewTimer(upScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
+ }
+ workerLog.Info().Msg("Terminated")
+ }()
+ // Worker killer
+ go func() {
+ e.dynamicWorkers.Add(1)
+ defer e.dynamicWorkers.Done()
+ defer func() {
+ if err := recover(); err != nil {
+ e.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ }()
+ workerLog := e.log.With().Str("Worker type", "killer").Logger()
+ if !e.traceWorkers {
+ workerLog = zerolog.Nop()
+ }
+ for e.running && !e.shutdown {
+ workerLog.Trace().Msg("running")
+ mutex.Lock()
+ newWorkers := make([]*worker, 0)
+ for _, _worker := range e.worker {
+ deadline := time.Now().Add(-timeToBecomeUnused)
+ workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
+ if _worker.lastReceived.Before(deadline) {
+ workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
+ _worker.interrupted.Store(true)
+ close(_worker.interrupter)
+ e.currentNumberOfWorkers.Add(-1)
+ } else {
+ workerLog.Debug().Int("Worker id", _worker.id).Msg("still ok")
+ newWorkers = append(newWorkers, _worker)
+ }
+ }
+ e.worker = newWorkers
+ mutex.Unlock()
+ func() {
+ workerLog.Debug().Msgf("Sleeping for %v", downScaleInterval)
+ timer := time.NewTimer(downScaleInterval)
+ defer utils.CleanupTimer(timer)
+ select {
+ case <-timer.C:
+ case <-e.interrupter:
+ workerLog.Info().Msg("interrupted")
+ }
+ }()
+ }
+ workerLog.Info().Msg("Terminated")
+ }()
+}
+
+func (e *dynamicExecutor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.dynamicStateChange.Lock()
+ defer e.dynamicStateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ close(e.interrupter)
+ e.log.Trace().Msg("stopping inner executor")
+ e.executor.Stop()
+ e.log.Debug().Msgf("waiting for %d dynamic workers to stop", e.currentNumberOfWorkers.Load())
+ e.dynamicWorkers.Wait()
+ e.log.Trace().Msg("stopped")
+}
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
new file mode 100644
index 0000000000..1db92a5d7f
--- /dev/null
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "testing"
+ "time"
+)
+
+func Test_dynamicExecutor_Start(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startTwice bool
+ }{
+ {
+ name: "just start",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "start twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ startTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ }
+ e.Start()
+ if tt.startTwice {
+ e.Start()
+ }
+ // Let it work a bit
+ time.Sleep(20 * time.Millisecond)
+ t.Log("done with test")
+ t.Cleanup(e.Stop)
+ })
+ }
+}
+
+func Test_dynamicExecutor_Stop(t *testing.T) {
+ type fields struct {
+ executor *executor
+ maxNumberOfWorkers int
+ interrupter chan struct{}
+ }
+ tests := []struct {
+ name string
+ fields fields
+ setup func(t *testing.T, fields *fields)
+ startIt bool
+ stopTwice bool
+ }{
+ {
+ name: "just stop",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop started",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ },
+ {
+ name: "stop twice",
+ fields: fields{
+ executor: &executor{
+ workItems: make(chan workItem, 1),
+ worker: make([]*worker, 0),
+ traceWorkers: true,
+ },
+ maxNumberOfWorkers: 100,
+ },
+ setup: func(t *testing.T, fields *fields) {
+ fields.executor.log = produceTestLogger(t)
+ fields.executor.workItems <- workItem{1, func() {}, &future{}}
+ },
+ stopTwice: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ e := &dynamicExecutor{
+ executor: tt.fields.executor,
+ maxNumberOfWorkers: tt.fields.maxNumberOfWorkers,
+ interrupter: tt.fields.interrupter,
+ }
+ if tt.startIt {
+ e.Start()
+ }
+ e.Stop()
+ if tt.stopTwice {
+ e.Stop()
+ }
+ })
+ }
+}
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
new file mode 100644
index 0000000000..79bddb6dc0
--- /dev/null
+++ b/plc4go/spi/pool/executor.go
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog"
+)
+
+type executor struct {
+ running bool
+ shutdown bool
+ stateChange sync.Mutex
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+
+ workerWaitGroup sync.WaitGroup
+
+ log zerolog.Logger
+}
+
+func (e *executor) isTraceWorkers() bool {
+ return e.traceWorkers
+}
+
+func (e *executor) getWorksItems() chan workItem {
+ return e.workItems
+}
+
+func (e *executor) getWorkerWaitGroup() *sync.WaitGroup {
+ return &e.workerWaitGroup
+}
+
+func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
+ if runnable == nil {
+ value := atomic.Value{}
+ value.Store(errors.New("runnable must not be nil"))
+ return &future{err: value}
+ }
+ e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+ completionFuture := &future{}
+ if e.shutdown {
+ completionFuture.Cancel(false, errors.New("executor in shutdown"))
+ return completionFuture
+ }
+ select {
+ case e.workItems <- workItem{
+ workItemId: workItemId,
+ runnable: runnable,
+ completionFuture: completionFuture,
+ }:
+ e.log.Trace().Msg("Item added")
+ case <-ctx.Done():
+ completionFuture.Cancel(false, ctx.Err())
+ }
+
+ e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+ return completionFuture
+}
+
+func (e *executor) Start() {
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if e.running || e.shutdown {
+ e.log.Warn().Msg("Already started")
+ return
+ }
+ e.running = true
+ e.shutdown = false
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ worker.initialize()
+ go worker.work()
+ }
+}
+
+func (e *executor) Stop() {
+ e.log.Trace().Msg("stopping now")
+ e.stateChange.Lock()
+ defer e.stateChange.Unlock()
+ if !e.running || e.shutdown {
+ e.log.Warn().Msg("already stopped")
+ return
+ }
+ e.shutdown = true
+ for i := 0; i < len(e.worker); i++ {
+ worker := e.worker[i]
+ worker.shutdown.Store(true)
+ worker.interrupted.Store(true)
+ close(worker.interrupter)
+ }
+ e.running = false
+ e.shutdown = false
+ e.log.Debug().Msgf("waiting for %d workers to stop", len(e.worker))
+ e.workerWaitGroup.Wait()
+ e.log.Trace().Msg("stopped")
+}
+
+func (e *executor) Close() error {
+ e.Stop()
+ return nil
+}
+
+func (e *executor) IsRunning() bool {
+ return e.running && !e.shutdown
+}
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
new file mode 100644
index 0000000000..2a43b1366e
--- /dev/null
+++ b/plc4go/spi/pool/executor_test.go
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "context"
+ "fmt"
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
+ "sync"
+ "testing"
+ "time"
+)
+
+func Test_executor_Close(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "close it",
+ wantErr: assert.NoError,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ tt.wantErr(t, e.Close(), fmt.Sprintf("Close()"))
+ })
+ }
+}
+
+func Test_executor_IsRunning(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want bool
+ }{
+ {
+ name: "no",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.IsRunning(), "IsRunning()")
+ })
+ }
+}
+
+func Test_executor_Start(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ shouldRun bool
+ }{
+ {
+ name: "Start fresh",
+ shouldRun: true,
+ },
+ {
+ name: "Start running",
+ fields: fields{
+ running: true,
+ },
+ shouldRun: true,
+ },
+ {
+ name: "Start stopping",
+ fields: fields{
+ running: true,
+ shutdown: true,
+ },
+ shouldRun: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Start()
+ assert.Equal(t, tt.shouldRun, e.IsRunning(), "should be running")
+ })
+ }
+}
+
+func Test_executor_Stop(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ tests := []struct {
+ name string
+ fields fields
+ shouldRun bool
+ }{
+ {
+ name: "Stop stopped",
+ shouldRun: false,
+ },
+ {
+ name: "Stop running",
+ fields: fields{
+ running: true,
+ queue: make(chan workItem),
+ worker: []*worker{
+ func() *worker {
+ w := &worker{}
+ w.initialize()
+ return w
+ }(),
+ },
+ },
+ shouldRun: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Stop()
+ })
+ }
+}
+
+func Test_executor_Submit(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queue chan workItem
+ traceWorkers bool
+ }
+ type args struct {
+ workItemId int32
+ runnable Runnable
+ context context.Context
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ completionFutureValidator func(t *testing.T, future CompletionFuture) bool
+ waitForCompletion bool
+ }{
+ {
+ name: "submitting nothing",
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ return assert.Error(t, completionFuture.(*future).err.Load().(error))
+ },
+ },
+ {
+ name: "submit canceled",
+ fields: fields{
+ queue: make(chan workItem, 0),
+ },
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // We do something for 3 seconds
+ <-time.NewTimer(3 * time.Second).C
+ },
+ context: func() context.Context {
+ ctx, cancelFunc := context.WithCancel(context.Background())
+ cancelFunc()
+ return ctx
+ }(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ err := completionFuture.(*future).err.Load().(error)
+ return assert.Error(t, err)
+ },
+ },
+ {
+ name: "Submit something which doesn't complete",
+ fields: fields{
+ queue: make(chan workItem, 1),
+ },
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // We do something for 3 seconds
+ <-time.NewTimer(3 * time.Second).C
+ },
+ context: context.TODO(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ completed := completionFuture.(*future).completed.Load()
+ return assert.False(t, completed)
+ },
+ },
+ {
+ name: "Submit something which does complete",
+ fields: func() fields {
+ var executor = NewFixedSizeExecutor(1, 1).(*executor)
+ return fields{
+ running: executor.running,
+ shutdown: executor.shutdown,
+ worker: executor.worker,
+ queue: executor.workItems,
+ traceWorkers: true,
+ }
+ }(),
+ args: args{
+ workItemId: 13,
+ runnable: func() {
+ // NOOP
+ },
+ context: context.TODO(),
+ },
+ completionFutureValidator: func(t *testing.T, completionFuture CompletionFuture) bool {
+ completed := completionFuture.(*future).completed.Load()
+ return assert.True(t, completed)
+ },
+ waitForCompletion: true,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ workItems: tt.fields.queue,
+ traceWorkers: tt.fields.traceWorkers,
+ }
+ e.Start()
+ completionFuture := e.Submit(tt.args.context, tt.args.workItemId, tt.args.runnable)
+ if tt.waitForCompletion {
+ assert.NoError(t, completionFuture.AwaitCompletion(testContext(t)))
+ }
+ assert.True(t, tt.completionFutureValidator(t, completionFuture), "Submit(%v, %v)", tt.args.workItemId, tt.args.runnable)
+ })
+ }
+}
+
+func Test_executor_getWorkerWaitGroup(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want *sync.WaitGroup
+ }{
+ {
+ name: "get it",
+ want: &sync.WaitGroup{},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.getWorkerWaitGroup(), "getWorkerWaitGroup()")
+ })
+ }
+}
+
+func Test_executor_getWorksItems(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want chan workItem
+ }{
+ {
+ name: "get it",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.getWorksItems(), "getWorksItems()")
+ })
+ }
+}
+
+func Test_executor_isTraceWorkers(t *testing.T) {
+ type fields struct {
+ running bool
+ shutdown bool
+ worker []*worker
+ queueDepth int
+ workItems chan workItem
+ traceWorkers bool
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want bool
+ }{
+ {
+ name: "it is not",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ e := &executor{
+ running: tt.fields.running,
+ shutdown: tt.fields.shutdown,
+ worker: tt.fields.worker,
+ queueDepth: tt.fields.queueDepth,
+ workItems: tt.fields.workItems,
+ traceWorkers: tt.fields.traceWorkers,
+ log: tt.fields.log,
+ }
+ assert.Equalf(t, tt.want, e.isTraceWorkers(), "isTraceWorkers()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/workItem.go b/plc4go/spi/pool/workItem.go
new file mode 100644
index 0000000000..942480bc1a
--- /dev/null
+++ b/plc4go/spi/pool/workItem.go
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import "fmt"
+
+type workItem struct {
+ workItemId int32
+ runnable Runnable
+ completionFuture *future
+}
+
+func (w workItem) String() string {
+ return fmt.Sprintf("Workitem{wid:%d, runnable(%t)}, completionFuture(%v)}", w.workItemId, w.runnable != nil, w.completionFuture)
+}
diff --git a/plc4go/spi/pool/workItem_test.go b/plc4go/spi/pool/workItem_test.go
new file mode 100644
index 0000000000..ebc4b30c5c
--- /dev/null
+++ b/plc4go/spi/pool/workItem_test.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_workItem_String(t *testing.T) {
+ type fields struct {
+ workItemId int32
+ runnable Runnable
+ completionFuture *future
+ }
+ tests := []struct {
+ name string
+ fields fields
+ want string
+ }{
+ {
+ name: "Simple test",
+ want: "Workitem{wid:0, runnable(false)}, completionFuture(<nil>)}",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &workItem{
+ workItemId: tt.fields.workItemId,
+ runnable: tt.fields.runnable,
+ completionFuture: tt.fields.completionFuture,
+ }
+ assert.Equalf(t, tt.want, w.String(), "String()")
+ })
+ }
+}
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
new file mode 100644
index 0000000000..7530ed0ad9
--- /dev/null
+++ b/plc4go/spi/pool/worker.go
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "runtime/debug"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/rs/zerolog"
+)
+
+type worker struct {
+ id int
+ shutdown atomic.Bool
+ interrupted atomic.Bool
+ interrupter chan struct{}
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ hasEnded atomic.Bool
+ lastReceived time.Time
+
+ log zerolog.Logger
+}
+
+func (w *worker) initialize() {
+ w.shutdown.Store(false)
+ w.interrupted.Store(false)
+ w.interrupter = make(chan struct{}, 1)
+ w.hasEnded.Store(false)
+ w.lastReceived = time.Now()
+}
+
+func (w *worker) work() {
+ w.executor.getWorkerWaitGroup().Add(1)
+ defer w.executor.getWorkerWaitGroup().Done()
+ defer func() {
+ if err := recover(); err != nil {
+ w.log.Error().Msgf("panic-ed %v. Stack: %s", err, debug.Stack())
+ }
+ if !w.shutdown.Load() {
+ // if we are not in shutdown we continue
+ w.work()
+ }
+ }()
+ workerLog := w.log.With().Int("Worker id", w.id).Logger()
+ if !w.executor.isTraceWorkers() {
+ workerLog = zerolog.Nop()
+ }
+ workerLog.Debug().Msgf("current ended state: %t", w.hasEnded.Load())
+ w.hasEnded.Store(false)
+ workerLog.Debug().Msgf("setting to not ended")
+
+ for !w.shutdown.Load() {
+ workerLog.Debug().Msg("Working")
+ select {
+ case _workItem := <-w.executor.getWorksItems():
+ w.lastReceived = time.Now()
+ workerLog.Debug().Msgf("Got work item %v", _workItem)
+ if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
+ workerLog.Debug().Msg("We need to stop")
+ // TODO: do we need to complete with a error?
+ } else {
+ workerLog.Debug().Msgf("Running work item %v", _workItem)
+ _workItem.runnable()
+ _workItem.completionFuture.complete()
+ workerLog.Debug().Msgf("work item %v completed", _workItem)
+ }
+ case <-w.interrupter:
+ workerLog.Debug().Msg("We got interrupted")
+ }
+ }
+ w.hasEnded.Store(true)
+ workerLog.Debug().Msg("setting to ended")
+}
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
new file mode 100644
index 0000000000..4f604d2efb
--- /dev/null
+++ b/plc4go/spi/pool/worker_test.go
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package pool
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/rs/zerolog"
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_worker_initialize(t *testing.T) {
+ type fields struct {
+ id int
+ interrupter chan struct{}
+ executor interface {
+ isTraceWorkers() bool
+ getWorksItems() chan workItem
+ getWorkerWaitGroup() *sync.WaitGroup
+ }
+ lastReceived time.Time
+ log zerolog.Logger
+ }
+ tests := []struct {
+ name string
+ fields fields
+ }{
+ {
+ name: "do it",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ interrupter: tt.fields.interrupter,
+ executor: tt.fields.executor,
+ lastReceived: tt.fields.lastReceived,
+ log: tt.fields.log,
+ }
+ w.initialize()
+ })
+ }
+}
+
+func Test_worker_work(t *testing.T) {
+ type fields struct {
+ id int
+ executor *executor
+ }
+ tests := []struct {
+ name string
+ fields fields
+ timeBeforeFirstValidation time.Duration
+ firstValidation func(*testing.T, *worker)
+ timeBeforeManipulation time.Duration
+ manipulator func(*worker)
+ timeBeforeSecondValidation time.Duration
+ secondValidation func(*testing.T, *worker)
+ }{
+ {
+ name: "Worker should work till shutdown (even if it panics)",
+ fields: fields{
+ id: 0,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ panic("Oh no what should I do???")
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Worker should work till shutdown",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ time.Sleep(time.Millisecond * 70)
+ },
+ completionFuture: &future{},
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work interrupted",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ return e
+ }(),
+ },
+ timeBeforeFirstValidation: 50 * time.Millisecond,
+ firstValidation: func(t *testing.T, w *worker) {
+ assert.False(t, w.hasEnded.Load(), "should not be ended")
+ },
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ timeBeforeSecondValidation: 150 * time.Millisecond,
+ secondValidation: func(t *testing.T, w *worker) {
+ assert.True(t, w.hasEnded.Load(), "should be ended")
+ },
+ },
+ {
+ name: "Work on canceled",
+ fields: fields{
+ id: 1,
+ executor: func() *executor {
+ e := &executor{
+ workItems: make(chan workItem),
+ traceWorkers: true,
+ }
+ go func() {
+ completionFuture := &future{}
+ completionFuture.cancelRequested.Store(true)
+ e.workItems <- workItem{
+ workItemId: 0,
+ runnable: func() {
+ time.Sleep(time.Millisecond * 70)
+ },
+ completionFuture: completionFuture,
+ }
+ }()
+ return e
+ }(),
+ },
+ timeBeforeManipulation: 50 * time.Millisecond,
+ manipulator: func(w *worker) {
+ w.shutdown.Store(true)
+ w.interrupter <- struct{}{}
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ w := &worker{
+ id: tt.fields.id,
+ interrupter: make(chan struct{}, 1),
+ executor: tt.fields.executor,
+ }
+ go w.work()
+ if tt.firstValidation != nil {
+ time.Sleep(tt.timeBeforeFirstValidation)
+ t.Logf("firstValidation after %v", tt.timeBeforeFirstValidation)
+ tt.firstValidation(t, w)
+ }
+ if tt.manipulator != nil {
+ time.Sleep(tt.timeBeforeManipulation)
+ t.Logf("manipulator after %v", tt.timeBeforeManipulation)
+ tt.manipulator(w)
+ }
+ if tt.secondValidation != nil {
+ time.Sleep(tt.timeBeforeSecondValidation)
+ t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
+ tt.secondValidation(t, w)
+ }
+ })
+ }
+}