You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 21:35:42 UTC
[plc4x] branch develop updated (c02f8f51de -> 5eca78479c)
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
from c02f8f51de chore(plc4go): re-order time declarations
new ac957705d6 refactor(plc4go/spi): move pool option to other options
new 5eca78479c feat(plc4go): add receive timeout option
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
plc4go/internal/cbus/CBusMessageMapper_test.go | 42 ++++++++++-----------
plc4go/internal/cbus/Discoverer_test.go | 3 +-
plc4go/pkg/api/config/config.go | 9 ++++-
plc4go/spi/default/DefaultCodec.go | 23 +++++++++++-
plc4go/spi/default/DefaultCodec_test.go | 18 +++++++--
plc4go/spi/options/Option.go | 43 ++++++++++++++++++++++
plc4go/spi/pool/WorkerPool.go | 37 +------------------
plc4go/spi/pool/WorkerPool_test.go | 28 ++------------
plc4go/spi/testutils/TestUtils.go | 5 +--
.../spi/transactions/RequestTransactionManager.go | 8 +++-
10 files changed, 123 insertions(+), 93 deletions(-)
[plc4x] 02/02: feat(plc4go): add receive timeout option
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 5eca78479cd542acbf0478f87606adf47798e6ea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:35:34 2023 +0200
feat(plc4go): add receive timeout option
---
plc4go/internal/cbus/Discoverer_test.go | 3 +--
plc4go/pkg/api/config/config.go | 6 ++++++
plc4go/spi/default/DefaultCodec.go | 23 ++++++++++++++++++++++-
plc4go/spi/default/DefaultCodec_test.go | 18 +++++++++++++++---
plc4go/spi/options/Option.go | 22 ++++++++++++++++++++++
5 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 544ac2d843..9e8270929e 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -29,7 +29,6 @@ import (
"testing"
"time"
- "github.com/apache/plc4x/plc4go/pkg/api/config"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
@@ -156,7 +155,6 @@ func TestDiscoverer_Discover(t *testing.T) {
}
func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
- config.TraceDefaultMessageCodecWorker = true
type fields struct {
transportInstanceCreationQueue pool.Executor
deviceScanningQueue pool.Executor
@@ -200,6 +198,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
t.Logf("%d bytes written", write)
}()
t.Cleanup(func() {
+ t.Log("close listener")
if err := listen.Close(); err != nil {
t.Error(err)
}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index 6e06449e3b..c731f181db 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -22,6 +22,7 @@ package config
import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/rs/zerolog"
+ "time"
)
// TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
@@ -49,6 +50,11 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
return options.WithPassLoggerToModel(passLogger)
}
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+ return options.WithReceiveTimeout(timeout)
+}
+
// WithTraceTransactionManagerWorkers enables trace transaction manager workers
func WithTraceTransactionManagerWorkers(traceWorkers bool) WithOption {
return options.WithTraceTransactionManagerWorkers(traceWorkers)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 9b6efddbff..21e202d48e 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -87,6 +87,7 @@ type defaultCodec struct {
stateChange sync.Mutex
activeWorker sync.WaitGroup
+ receiveTimeout time.Duration
traceDefaultMessageCodecWorker bool
log zerolog.Logger `ignore:"true"`
@@ -108,6 +109,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
defaultIncomingMessageChannel: make(chan spi.Message, 100),
expectations: []spi.Expectation{},
customMessageHandling: customMessageHandler,
+ receiveTimeout: options.ExtractReceiveTimeout(_options...),
traceDefaultMessageCodecWorker: options.ExtractTraceDefaultMessageCodecWorker(_options...) || config.TraceDefaultMessageCodecWorker,
log: options.ExtractCustomLogger(_options...),
}
@@ -164,6 +166,7 @@ func (m *defaultCodec) Disconnect() error {
m.running.Store(false)
m.log.Trace().Msg("Waiting for worker to shutdown")
m.activeWorker.Wait()
+ m.log.Trace().Msg("worker shut down")
if m.transportInstance != nil {
if err := m.transportInstance.Close(); err != nil {
return errors.Wrap(err, "error closing transport instance")
@@ -318,7 +321,25 @@ mainLoop:
workerLog.Trace().Msg("Receiving message")
// Check for incoming messages.
- message, err := m.Receive()
+ var message spi.Message
+ var err error
+ {
+ syncer := make(chan struct{})
+ go func() {
+ message, err = m.Receive()
+ close(syncer)
+ }()
+ timeoutTimer := time.NewTimer(m.receiveTimeout)
+ select {
+ case <-syncer:
+ utils.CleanupTimer(timeoutTimer)
+ case <-timeoutTimer.C:
+ utils.CleanupTimer(timeoutTimer)
+ workerLog.Error().Msgf("receive timeout after %s", m.receiveTimeout)
+ continue mainLoop
+ }
+
+ }
if err != nil {
workerLog.Error().Err(err).Msg("got an error reading from transport")
time.Sleep(10 * time.Millisecond)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 19bde057ac..47ee666559 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -249,7 +249,8 @@ func TestNewDefaultCodec(t *testing.T) {
{
name: "create it",
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
}
@@ -298,7 +299,8 @@ func Test_buildDefaultCodec(t *testing.T) {
{
name: "build it",
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
{
@@ -309,7 +311,8 @@ func Test_buildDefaultCodec(t *testing.T) {
},
},
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
}
@@ -897,6 +900,10 @@ func Test_defaultCodec_Work(t *testing.T) {
codec.running.Store(true)
codec.activeWorker.Add(1)
},
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ requirements := NewMockDefaultCodecRequirements(t)
+ fields.DefaultCodecRequirements = requirements
+ },
},
{
name: "work hard (panics everywhere)",
@@ -934,6 +941,11 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ requirements := NewMockDefaultCodecRequirements(t)
+ requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
+ fields.DefaultCodecRequirements = requirements
+ },
manipulator: func(t *testing.T, codec *defaultCodec) {
codec.running.Store(true)
codec.activeWorker.Add(1)
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 3d5907b88c..e3dbc9fc48 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -22,6 +22,7 @@ package options
import (
"context"
"github.com/rs/zerolog"
+ "time"
)
// WithOption is a marker interface for options supplied by the builders like WithDefaultTtl
@@ -59,6 +60,22 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
return withPassLoggerToModel{passLogger: passLogger}
}
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+ return withReceiveTimeout{timeout: timeout}
+}
+
+// ExtractReceiveTimeout to extract the receive-timeout for reading operations. Defaults to 10 seconds
+func ExtractReceiveTimeout(options ...WithOption) time.Duration {
+ for _, option := range options {
+ switch option := option.(type) {
+ case withReceiveTimeout:
+ return option.timeout
+ }
+ }
+ return 10 * time.Second
+}
+
// ExtractPassLoggerToModel to extract the flag indicating that model should be passed to Model
func ExtractPassLoggerToModel(options ...WithOption) bool {
for _, option := range options {
@@ -167,6 +184,11 @@ type withPassLoggerToModel struct {
passLogger bool
}
+type withReceiveTimeout struct {
+ Option
+ timeout time.Duration
+}
+
type withTraceTransactionManagerWorkers struct {
Option
traceWorkers bool
[plc4x] 01/02: refactor(plc4go/spi): move pool option to other options
Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit ac957705d69a694b5f8b3670cfa7e5e798df6cc9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:18:28 2023 +0200
refactor(plc4go/spi): move pool option to other options
---
plc4go/internal/cbus/CBusMessageMapper_test.go | 42 +++++++++++-----------
plc4go/pkg/api/config/config.go | 3 +-
plc4go/spi/options/Option.go | 21 +++++++++++
plc4go/spi/pool/WorkerPool.go | 37 ++-----------------
plc4go/spi/pool/WorkerPool_test.go | 28 ++-------------
plc4go/spi/testutils/TestUtils.go | 5 ++-
.../spi/transactions/RequestTransactionManager.go | 8 ++++-
7 files changed, 57 insertions(+), 87 deletions(-)
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index 3dbdf0f1ea..aa9a8a90a0 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1544,7 +1544,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1619,7 +1619,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1683,7 +1683,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1737,7 +1737,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1794,7 +1794,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1871,7 +1871,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -1966,7 +1966,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2024,7 +2024,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2078,7 +2078,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2132,7 +2132,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2186,7 +2186,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2240,7 +2240,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2314,7 +2314,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2368,7 +2368,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2422,7 +2422,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2476,7 +2476,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2530,7 +2530,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2584,7 +2584,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2654,7 +2654,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2708,7 +2708,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
@@ -2762,7 +2762,7 @@ func TestMapEncodedReply(t *testing.T) {
options.WithCustomLogger(testutils.ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
transactions.WithCustomExecutor(
- pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t)), pool.WithExecutorOptionTracerWorkers(true)),
+ pool.NewFixedSizeExecutor(10, 50, options.WithCustomLogger(testutils.ProduceTestingLogger(t))),
),
)
t.Cleanup(func() {
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index ba08e54ee3..6e06449e3b 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -21,7 +21,6 @@ package config
import (
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/rs/zerolog"
)
@@ -67,7 +66,7 @@ func WithTraceDefaultMessageCodecWorker(traceWorkers bool) WithOption {
// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
- return pool.WithExecutorOptionTracerWorkers(traceWorkers)
+ return options.WithExecutorOptionTracerWorkers(traceWorkers)
}
// WithOption is a marker interface for options
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 7ebe9641ae..3d5907b88c 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -118,6 +118,22 @@ func ExtractTraceDefaultMessageCodecWorker(options ...WithOption) bool {
return false
}
+// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
+func WithExecutorOptionTracerWorkers(traceWorkers bool) WithOption {
+ return &withTracerExecutorWorkersOption{traceWorkers: traceWorkers}
+}
+
+// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
+func ExtractTracerWorkers(_options ...WithOption) (traceWorkers bool, found bool) {
+ for _, option := range _options {
+ switch option := option.(type) {
+ case *withTracerExecutorWorkersOption:
+ return option.traceWorkers, true
+ }
+ }
+ return false, false
+}
+
// GetLoggerContextForModel returns a log context if the WithPassLoggerToModel WithOption is set
func GetLoggerContextForModel(ctx context.Context, log zerolog.Logger, options ...WithOption) context.Context {
passToModel := false
@@ -166,6 +182,11 @@ type withTraceDefaultMessageCodecWorker struct {
traceWorkers bool
}
+type withTracerExecutorWorkersOption struct {
+ Option
+ traceWorkers bool
+}
+
//
//
///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 36dd72f494..c9e21b6581 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -60,7 +60,7 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
worker: workers,
log: customLogger,
}
- _executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
+ _executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = _executor
}
@@ -77,7 +77,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
},
maxNumberOfWorkers: maxNumberOfWorkers,
}
- _executor.traceWorkers, _ = ExtractTracerWorkers(_options...)
+ _executor.traceWorkers, _ = options.ExtractTracerWorkers(_options...)
// We spawn one initial worker
w := worker{
id: 0,
@@ -89,36 +89,3 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
_executor.worker = append(_executor.worker, &w)
return _executor
}
-
-// WithExecutorOptionTracerWorkers sets a flag which extends logging for workers
-func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
- return &tracerWorkersOption{traceWorkers: traceWorkers}
-}
-
-// ExtractTracerWorkers returns the value from WithExecutorOptionTracerWorkers
-func ExtractTracerWorkers(_options ...options.WithOption) (traceWorkers bool, found bool) {
- for _, option := range _options {
- switch option := option.(type) {
- case *tracerWorkersOption:
- return option.traceWorkers, true
- }
- }
- return false, false
-}
-
-///////////////////////////////////////
-///////////////////////////////////////
-//
-// Internal section
-//
-
-type tracerWorkersOption struct {
- options.Option
- traceWorkers bool
-}
-
-//
-// Internal section
-//
-///////////////////////////////////////
-///////////////////////////////////////
diff --git a/plc4go/spi/pool/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
index fc67a926ac..5e67092467 100644
--- a/plc4go/spi/pool/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -46,7 +46,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -86,7 +86,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -104,7 +104,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 2,
queueDepth: 2,
- options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{options.WithExecutorOptionTracerWorkers(true)},
},
setup: func(t *testing.T, args *args) {
args.options = append(args.options, options.WithCustomLogger(produceTestingLogger(t)))
@@ -175,25 +175,3 @@ func TestNewDynamicExecutor(t *testing.T) {
})
}
}
-
-func TestWithExecutorOptionTracerWorkers(t *testing.T) {
- type args struct {
- traceWorkers bool
- }
- tests := []struct {
- name string
- args args
- executorValidator options.WithOption
- }{
- {
- name: "option should set option",
- args: args{traceWorkers: true},
- executorValidator: &tracerWorkersOption{traceWorkers: true},
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- assert.Equal(t, tt.executorValidator, WithExecutorOptionTracerWorkers(tt.args.traceWorkers))
- })
- }
-}
diff --git a/plc4go/spi/testutils/TestUtils.go b/plc4go/spi/testutils/TestUtils.go
index e3937a7ff3..66c1b6f907 100644
--- a/plc4go/spi/testutils/TestUtils.go
+++ b/plc4go/spi/testutils/TestUtils.go
@@ -22,7 +22,6 @@ package testutils
import (
"context"
"github.com/apache/plc4x/plc4go/spi/options"
- "github.com/apache/plc4x/plc4go/spi/pool"
"github.com/apache/plc4x/plc4go/spi/utils"
"os"
"runtime/debug"
@@ -155,14 +154,14 @@ func ProduceTestingLogger(t *testing.T) zerolog.Logger {
// EnrichOptionsWithOptionsForTesting appends options useful for testing to config.WithOption s
func EnrichOptionsWithOptionsForTesting(t *testing.T, _options ...options.WithOption) []options.WithOption {
traceWorkers := true
- if extractedTraceWorkers, found := pool.ExtractTracerWorkers(_options...); found {
+ if extractedTraceWorkers, found := options.ExtractTracerWorkers(_options...); found {
traceWorkers = extractedTraceWorkers
}
// TODO: apply to other options like above
return append(_options,
options.WithCustomLogger(ProduceTestingLogger(t)),
options.WithPassLoggerToModel(true),
- pool.WithExecutorOptionTracerWorkers(traceWorkers),
+ options.WithExecutorOptionTracerWorkers(traceWorkers),
)
}
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index ffbc4710f2..55cef95f96 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -24,6 +24,7 @@ import (
"context"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
+ "github.com/rs/zerolog/log"
"io"
"runtime"
"sync"
@@ -39,7 +40,12 @@ import (
var sharedExecutorInstance pool.Executor // shared instance
func init() {
- sharedExecutorInstance = pool.NewFixedSizeExecutor(runtime.NumCPU(), 100, pool.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers))
+ sharedExecutorInstance = pool.NewFixedSizeExecutor(
+ runtime.NumCPU(),
+ 100,
+ options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
+ config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
+ )
sharedExecutorInstance.Start()
runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
sharedExecutorInstance.Stop()