You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 21:35:44 UTC
[plc4x] 02/02: feat(plc4go): add receive timeout option
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 5eca78479cd542acbf0478f87606adf47798e6ea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:35:34 2023 +0200
feat(plc4go): add receive timeout option
---
plc4go/internal/cbus/Discoverer_test.go | 3 +--
plc4go/pkg/api/config/config.go | 6 ++++++
plc4go/spi/default/DefaultCodec.go | 23 ++++++++++++++++++++++-
plc4go/spi/default/DefaultCodec_test.go | 18 +++++++++++++++---
plc4go/spi/options/Option.go | 22 ++++++++++++++++++++++
5 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 544ac2d843..9e8270929e 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -29,7 +29,6 @@ import (
"testing"
"time"
- "github.com/apache/plc4x/plc4go/pkg/api/config"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/apache/plc4x/plc4go/spi/pool"
@@ -156,7 +155,6 @@ func TestDiscoverer_Discover(t *testing.T) {
}
func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
- config.TraceDefaultMessageCodecWorker = true
type fields struct {
transportInstanceCreationQueue pool.Executor
deviceScanningQueue pool.Executor
@@ -200,6 +198,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
t.Logf("%d bytes written", write)
}()
t.Cleanup(func() {
+ t.Log("close listener")
if err := listen.Close(); err != nil {
t.Error(err)
}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index 6e06449e3b..c731f181db 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -22,6 +22,7 @@ package config
import (
"github.com/apache/plc4x/plc4go/spi/options"
"github.com/rs/zerolog"
+ "time"
)
// TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
@@ -49,6 +50,11 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
return options.WithPassLoggerToModel(passLogger)
}
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+ return options.WithReceiveTimeout(timeout)
+}
+
// WithTraceTransactionManagerWorkers enables trace transaction manager workers
func WithTraceTransactionManagerWorkers(traceWorkers bool) WithOption {
return options.WithTraceTransactionManagerWorkers(traceWorkers)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 9b6efddbff..21e202d48e 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -87,6 +87,7 @@ type defaultCodec struct {
stateChange sync.Mutex
activeWorker sync.WaitGroup
+ receiveTimeout time.Duration
traceDefaultMessageCodecWorker bool
log zerolog.Logger `ignore:"true"`
@@ -108,6 +109,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
defaultIncomingMessageChannel: make(chan spi.Message, 100),
expectations: []spi.Expectation{},
customMessageHandling: customMessageHandler,
+ receiveTimeout: options.ExtractReceiveTimeout(_options...),
traceDefaultMessageCodecWorker: options.ExtractTraceDefaultMessageCodecWorker(_options...) || config.TraceDefaultMessageCodecWorker,
log: options.ExtractCustomLogger(_options...),
}
@@ -164,6 +166,7 @@ func (m *defaultCodec) Disconnect() error {
m.running.Store(false)
m.log.Trace().Msg("Waiting for worker to shutdown")
m.activeWorker.Wait()
+ m.log.Trace().Msg("worker shut down")
if m.transportInstance != nil {
if err := m.transportInstance.Close(); err != nil {
return errors.Wrap(err, "error closing transport instance")
@@ -318,7 +321,25 @@ mainLoop:
workerLog.Trace().Msg("Receiving message")
// Check for incoming messages.
- message, err := m.Receive()
+ var message spi.Message
+ var err error
+ {
+ syncer := make(chan struct{})
+ go func() {
+ message, err = m.Receive()
+ close(syncer)
+ }()
+ timeoutTimer := time.NewTimer(m.receiveTimeout)
+ select {
+ case <-syncer:
+ utils.CleanupTimer(timeoutTimer)
+ case <-timeoutTimer.C:
+ utils.CleanupTimer(timeoutTimer)
+ workerLog.Error().Msgf("receive timeout after %s", m.receiveTimeout)
+ continue mainLoop
+ }
+
+ }
if err != nil {
workerLog.Error().Err(err).Msg("got an error reading from transport")
time.Sleep(10 * time.Millisecond)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 19bde057ac..47ee666559 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -249,7 +249,8 @@ func TestNewDefaultCodec(t *testing.T) {
{
name: "create it",
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
}
@@ -298,7 +299,8 @@ func Test_buildDefaultCodec(t *testing.T) {
{
name: "build it",
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
{
@@ -309,7 +311,8 @@ func Test_buildDefaultCodec(t *testing.T) {
},
},
want: &defaultCodec{
- expectations: []spi.Expectation{},
+ expectations: []spi.Expectation{},
+ receiveTimeout: 10 * time.Second,
},
},
}
@@ -897,6 +900,10 @@ func Test_defaultCodec_Work(t *testing.T) {
codec.running.Store(true)
codec.activeWorker.Add(1)
},
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ requirements := NewMockDefaultCodecRequirements(t)
+ fields.DefaultCodecRequirements = requirements
+ },
},
{
name: "work hard (panics everywhere)",
@@ -934,6 +941,11 @@ func Test_defaultCodec_Work(t *testing.T) {
},
},
},
+ mockSetup: func(t *testing.T, fields *fields, args *args) {
+ requirements := NewMockDefaultCodecRequirements(t)
+ requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
+ fields.DefaultCodecRequirements = requirements
+ },
manipulator: func(t *testing.T, codec *defaultCodec) {
codec.running.Store(true)
codec.activeWorker.Add(1)
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 3d5907b88c..e3dbc9fc48 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -22,6 +22,7 @@ package options
import (
"context"
"github.com/rs/zerolog"
+ "time"
)
// WithOption is a marker interface for options supplied by the builders like WithDefaultTtl
@@ -59,6 +60,22 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
return withPassLoggerToModel{passLogger: passLogger}
}
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+ return withReceiveTimeout{timeout: timeout}
+}
+
+// ExtractReceiveTimeout to extract the receive-timeout for reading operations. Defaults to 10 seconds
+func ExtractReceiveTimeout(options ...WithOption) time.Duration {
+ for _, option := range options {
+ switch option := option.(type) {
+ case withReceiveTimeout:
+ return option.timeout
+ }
+ }
+ return 10 * time.Second
+}
+
// ExtractPassLoggerToModel to extract the flag indicating that model should be passed to Model
func ExtractPassLoggerToModel(options ...WithOption) bool {
for _, option := range options {
@@ -167,6 +184,11 @@ type withPassLoggerToModel struct {
passLogger bool
}
+type withReceiveTimeout struct {
+ Option
+ timeout time.Duration
+}
+
type withTraceTransactionManagerWorkers struct {
Option
traceWorkers bool