You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:22 UTC

[plc4x] 05/07: feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 8e0f2193356967a11df6e3e0b160b458aa8152b9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:05:29 2023 +0200

    feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
    
    + set buffer to 100 to allow for a bit latency
---
 plc4go/spi/default/DefaultCodec.go | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)

diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 91c48d0ce9..78c3742f50 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -104,7 +104,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 	return &defaultCodec{
 		DefaultCodecRequirements:      defaultCodecRequirements,
 		transportInstance:             transportInstance,
-		defaultIncomingMessageChannel: make(chan spi.Message),
+		defaultIncomingMessageChannel: make(chan spi.Message, 100),
 		expectations:                  []spi.Expectation{},
 		customMessageHandling:         customMessageHandler,
 		log:                           logger,
@@ -329,10 +329,14 @@ mainLoop:
 			time.Sleep(time.Millisecond * 10)
 			continue mainLoop
 		}
+		workerLog.Trace().Msgf("got message:\n%s", message)
 
 		if m.customMessageHandling != nil {
 			workerLog.Trace().Msg("Executing custom handling")
-			if m.customMessageHandling(codec, message) {
+			start := time.Now()
+			handled := m.customMessageHandling(codec, message)
+			workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
+			if handled {
 				workerLog.Trace().Msg("Custom handling handled the message")
 				continue mainLoop
 			}
@@ -352,12 +356,9 @@ mainLoop:
 }
 
 func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
-	timeout := time.NewTimer(time.Millisecond * 40)
-	defer utils.CleanupTimer(timeout)
 	select {
 	case m.defaultIncomingMessageChannel <- message:
-	case <-timeout.C:
-		timeout.Stop()
+	default:
 		workerLog.Warn().Msgf("Message discarded\n%s", message)
 	}
 }