You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 16:46:22 UTC
[plc4x] 05/07: feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 8e0f2193356967a11df6e3e0b160b458aa8152b9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 18:05:29 2023 +0200
feat(plc4go/spi): time custom message handling and don't block when default message channel is not being drained
+ set buffer to 100 to allow for a bit latency
---
plc4go/spi/default/DefaultCodec.go | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 91c48d0ce9..78c3742f50 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -104,7 +104,7 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
return &defaultCodec{
DefaultCodecRequirements: defaultCodecRequirements,
transportInstance: transportInstance,
- defaultIncomingMessageChannel: make(chan spi.Message),
+ defaultIncomingMessageChannel: make(chan spi.Message, 100),
expectations: []spi.Expectation{},
customMessageHandling: customMessageHandler,
log: logger,
@@ -329,10 +329,14 @@ mainLoop:
time.Sleep(time.Millisecond * 10)
continue mainLoop
}
+ workerLog.Trace().Msgf("got message:\n%s", message)
if m.customMessageHandling != nil {
workerLog.Trace().Msg("Executing custom handling")
- if m.customMessageHandling(codec, message) {
+ start := time.Now()
+ handled := m.customMessageHandling(codec, message)
+ workerLog.Trace().Msgf("custom handling took %s", time.Since(start))
+ if handled {
workerLog.Trace().Msg("Custom handling handled the message")
continue mainLoop
}
@@ -352,12 +356,9 @@ mainLoop:
}
func (m *defaultCodec) passToDefaultIncomingMessageChannel(workerLog zerolog.Logger, message spi.Message) {
- timeout := time.NewTimer(time.Millisecond * 40)
- defer utils.CleanupTimer(timeout)
select {
case m.defaultIncomingMessageChannel <- message:
- case <-timeout.C:
- timeout.Stop()
+ default:
workerLog.Warn().Msgf("Message discarded\n%s", message)
}
}