You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 21:35:44 UTC

[plc4x] 02/02: feat(plc4go): add receive timeout option

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 5eca78479cd542acbf0478f87606adf47798e6ea
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:35:34 2023 +0200

    feat(plc4go): add receive timeout option
---
 plc4go/internal/cbus/Discoverer_test.go |  3 +--
 plc4go/pkg/api/config/config.go         |  6 ++++++
 plc4go/spi/default/DefaultCodec.go      | 23 ++++++++++++++++++++++-
 plc4go/spi/default/DefaultCodec_test.go | 18 +++++++++++++++---
 plc4go/spi/options/Option.go            | 22 ++++++++++++++++++++++
 5 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 544ac2d843..9e8270929e 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -29,7 +29,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/apache/plc4x/plc4go/pkg/api/config"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
@@ -156,7 +155,6 @@ func TestDiscoverer_Discover(t *testing.T) {
 }
 
 func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
-	config.TraceDefaultMessageCodecWorker = true
 	type fields struct {
 		transportInstanceCreationQueue pool.Executor
 		deviceScanningQueue            pool.Executor
@@ -200,6 +198,7 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 					t.Logf("%d bytes written", write)
 				}()
 				t.Cleanup(func() {
+					t.Log("close listener")
 					if err := listen.Close(); err != nil {
 						t.Error(err)
 					}
diff --git a/plc4go/pkg/api/config/config.go b/plc4go/pkg/api/config/config.go
index 6e06449e3b..c731f181db 100644
--- a/plc4go/pkg/api/config/config.go
+++ b/plc4go/pkg/api/config/config.go
@@ -22,6 +22,7 @@ package config
 import (
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/rs/zerolog"
+	"time"
 )
 
 // TraceTransactionManagerWorkers when set to true the transaction manager displays worker states in log
@@ -49,6 +50,11 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
 	return options.WithPassLoggerToModel(passLogger)
 }
 
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+	return options.WithReceiveTimeout(timeout)
+}
+
 // WithTraceTransactionManagerWorkers enables trace transaction manager workers
 func WithTraceTransactionManagerWorkers(traceWorkers bool) WithOption {
 	return options.WithTraceTransactionManagerWorkers(traceWorkers)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 9b6efddbff..21e202d48e 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -87,6 +87,7 @@ type defaultCodec struct {
 	stateChange             sync.Mutex
 	activeWorker            sync.WaitGroup
 
+	receiveTimeout                 time.Duration
 	traceDefaultMessageCodecWorker bool
 
 	log zerolog.Logger `ignore:"true"`
@@ -108,6 +109,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 		defaultIncomingMessageChannel:  make(chan spi.Message, 100),
 		expectations:                   []spi.Expectation{},
 		customMessageHandling:          customMessageHandler,
+		receiveTimeout:                 options.ExtractReceiveTimeout(_options...),
 		traceDefaultMessageCodecWorker: options.ExtractTraceDefaultMessageCodecWorker(_options...) || config.TraceDefaultMessageCodecWorker,
 		log:                            options.ExtractCustomLogger(_options...),
 	}
@@ -164,6 +166,7 @@ func (m *defaultCodec) Disconnect() error {
 	m.running.Store(false)
 	m.log.Trace().Msg("Waiting for worker to shutdown")
 	m.activeWorker.Wait()
+	m.log.Trace().Msg("worker shut down")
 	if m.transportInstance != nil {
 		if err := m.transportInstance.Close(); err != nil {
 			return errors.Wrap(err, "error closing transport instance")
@@ -318,7 +321,25 @@ mainLoop:
 
 		workerLog.Trace().Msg("Receiving message")
 		// Check for incoming messages.
-		message, err := m.Receive()
+		var message spi.Message
+		var err error
+		{
+			syncer := make(chan struct{})
+			go func() {
+				message, err = m.Receive()
+				close(syncer)
+			}()
+			timeoutTimer := time.NewTimer(m.receiveTimeout)
+			select {
+			case <-syncer:
+				utils.CleanupTimer(timeoutTimer)
+			case <-timeoutTimer.C:
+				utils.CleanupTimer(timeoutTimer)
+				workerLog.Error().Msgf("receive timeout after %s", m.receiveTimeout)
+				continue mainLoop
+			}
+
+		}
 		if err != nil {
 			workerLog.Error().Err(err).Msg("got an error reading from transport")
 			time.Sleep(10 * time.Millisecond)
diff --git a/plc4go/spi/default/DefaultCodec_test.go b/plc4go/spi/default/DefaultCodec_test.go
index 19bde057ac..47ee666559 100644
--- a/plc4go/spi/default/DefaultCodec_test.go
+++ b/plc4go/spi/default/DefaultCodec_test.go
@@ -249,7 +249,8 @@ func TestNewDefaultCodec(t *testing.T) {
 		{
 			name: "create it",
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 	}
@@ -298,7 +299,8 @@ func Test_buildDefaultCodec(t *testing.T) {
 		{
 			name: "build it",
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 		{
@@ -309,7 +311,8 @@ func Test_buildDefaultCodec(t *testing.T) {
 				},
 			},
 			want: &defaultCodec{
-				expectations: []spi.Expectation{},
+				expectations:   []spi.Expectation{},
+				receiveTimeout: 10 * time.Second,
 			},
 		},
 	}
@@ -897,6 +900,10 @@ func Test_defaultCodec_Work(t *testing.T) {
 				codec.running.Store(true)
 				codec.activeWorker.Add(1)
 			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				requirements := NewMockDefaultCodecRequirements(t)
+				fields.DefaultCodecRequirements = requirements
+			},
 		},
 		{
 			name: "work hard (panics everywhere)",
@@ -934,6 +941,11 @@ func Test_defaultCodec_Work(t *testing.T) {
 					},
 				},
 			},
+			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				requirements := NewMockDefaultCodecRequirements(t)
+				requirements.EXPECT().Receive().Return(nil, errors.New("nope"))
+				fields.DefaultCodecRequirements = requirements
+			},
 			manipulator: func(t *testing.T, codec *defaultCodec) {
 				codec.running.Store(true)
 				codec.activeWorker.Add(1)
diff --git a/plc4go/spi/options/Option.go b/plc4go/spi/options/Option.go
index 3d5907b88c..e3dbc9fc48 100644
--- a/plc4go/spi/options/Option.go
+++ b/plc4go/spi/options/Option.go
@@ -22,6 +22,7 @@ package options
 import (
 	"context"
 	"github.com/rs/zerolog"
+	"time"
 )
 
 // WithOption is a marker interface for options supplied by the builders like WithDefaultTtl
@@ -59,6 +60,22 @@ func WithPassLoggerToModel(passLogger bool) WithOption {
 	return withPassLoggerToModel{passLogger: passLogger}
 }
 
+// WithReceiveTimeout set's a timeout for a receive-operation (similar to SO_RCVTIMEO)
+func WithReceiveTimeout(timeout time.Duration) WithOption {
+	return withReceiveTimeout{timeout: timeout}
+}
+
+// ExtractReceiveTimeout to extract the receive-timeout for reading operations. Defaults to 10 seconds
+func ExtractReceiveTimeout(options ...WithOption) time.Duration {
+	for _, option := range options {
+		switch option := option.(type) {
+		case withReceiveTimeout:
+			return option.timeout
+		}
+	}
+	return 10 * time.Second
+}
+
 // ExtractPassLoggerToModel to extract the flag indicating that model should be passed to Model
 func ExtractPassLoggerToModel(options ...WithOption) bool {
 	for _, option := range options {
@@ -167,6 +184,11 @@ type withPassLoggerToModel struct {
 	passLogger bool
 }
 
+type withReceiveTimeout struct {
+	Option
+	timeout time.Duration
+}
+
 type withTraceTransactionManagerWorkers struct {
 	Option
 	traceWorkers bool