You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 15:05:37 UTC
[plc4x] branch develop updated: refactor(plc4go/cbus): move MMI handling to subscription handling
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 6bd288c47 refactor(plc4go/cbus): move MMI handling to subscription handling
6bd288c47 is described below
commit 6bd288c47fc5dda23c503ba18510885dadb8a2fd
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 17:05:31 2022 +0200
refactor(plc4go/cbus): move MMI handling to subscription handling
- don't handle messages so we can react in a read to it
---
plc4go/internal/cbus/Connection.go | 48 +++++-----------
plc4go/internal/cbus/MessageCodec.go | 104 +++++++++++++++++++----------------
2 files changed, 72 insertions(+), 80 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 96c0ebfff..f339eed62 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -206,55 +206,37 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}
c.fireConnected(ch)
c.startSubscriptionHandler()
- c.startDefaultIncomingMessageHandler()
}
-func (c *Connection) startDefaultIncomingMessageHandler() {
- log.Debug().Msg("Starting default incoming message handler")
+func (c *Connection) startSubscriptionHandler() {
+ log.Debug().Msg("Starting SAL handler")
go func() {
- log.Debug().Msg("default incoming message handler started")
+ log.Debug().Msg("SAL handler stated")
for c.IsConnected() {
- for message := range c.messageCodec.GetDefaultIncomingMessageChannel() {
- switch message := message.(type) {
- case readWriteModel.CBusMessageToClientExactly:
- switch reply := message.GetReply().(type) {
- case readWriteModel.ReplyOrConfirmationReplyExactly:
- switch reply := reply.GetReply().(type) {
- case readWriteModel.ReplyEncodedReplyExactly:
- switch encodedReply := reply.GetEncodedReply().(type) {
- case readWriteModel.EncodedReplyCALReplyExactly:
- for _, subscriber := range c.subscribers {
- calReply := encodedReply.GetCalReply()
- if ok := subscriber.handleMonitoredMMI(calReply); ok {
- log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
- continue
- }
- }
- }
- }
+ for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+ for _, subscriber := range c.subscribers {
+ if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
+ log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
}
}
- log.Debug().Msgf("Received unhandled \n%v", message)
}
}
- log.Info().Msg("Ending default incoming message handler")
+ log.Info().Msg("Ending SAL handler")
}()
-}
-
-func (c *Connection) startSubscriptionHandler() {
- log.Debug().Msg("Starting subscription handler")
+ log.Debug().Msg("Starting MMI handler")
go func() {
- log.Debug().Msg("Subscription handler stated")
+ log.Debug().Msg("default MMI started")
for c.IsConnected() {
- for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+ for calReply := range c.messageCodec.(*MessageCodec).monitoredMMIs {
for _, subscriber := range c.subscribers {
- if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
- log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
+ if ok := subscriber.handleMonitoredMMI(calReply); ok {
+ log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
+ continue
}
}
}
}
- log.Info().Msg("Ending subscription handler")
+ log.Info().Msg("Ending MMI handler")
}()
}
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 0c81027a6..ee5ed3a36 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,7 +21,7 @@ package cbus
import (
"bufio"
- readwriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
@@ -34,10 +34,11 @@ import (
type MessageCodec struct {
_default.DefaultCodec
- requestContext readwriteModel.RequestContext
- cbusOptions readwriteModel.CBusOptions
+ requestContext readWriteModel.RequestContext
+ cbusOptions readWriteModel.CBusOptions
- monitoredSALs chan readwriteModel.MonitoredSAL
+ monitoredMMIs chan readWriteModel.CALReply
+ monitoredSALs chan readWriteModel.MonitoredSAL
lastPackageHash uint32
hashEncountered uint
@@ -46,27 +47,12 @@ type MessageCodec struct {
func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
codec := &MessageCodec{
- requestContext: readwriteModel.NewRequestContext(false),
- cbusOptions: readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false),
- monitoredSALs: make(chan readwriteModel.MonitoredSAL, 100),
+ requestContext: readWriteModel.NewRequestContext(false),
+ cbusOptions: readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false),
+ monitoredMMIs: make(chan readWriteModel.CALReply, 100),
+ monitoredSALs: make(chan readWriteModel.MonitoredSAL, 100),
}
- codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
- switch message := message.(type) {
- case readwriteModel.CBusMessageToClientExactly:
- switch reply := message.GetReply().(type) {
- case readwriteModel.ReplyOrConfirmationReplyExactly:
- switch reply := reply.GetReply().(type) {
- case readwriteModel.ReplyEncodedReplyExactly:
- switch encodedReply := reply.GetEncodedReply().(type) {
- case readwriteModel.MonitoredSALReplyExactly:
- codec.(*MessageCodec).monitoredSALs <- encodedReply.GetMonitoredSAL()
- return true
- }
- }
- }
- }
- return false
- }))
+ codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(extractMMIAndSAL))
return codec
}
@@ -77,7 +63,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
func (m *MessageCodec) Send(message spi.Message) error {
log.Trace().Msg("Sending message")
// Cast the message to the correct type of struct
- cbusMessage := message.(readwriteModel.CBusMessage)
+ cbusMessage := message.(readWriteModel.CBusMessage)
// Set the right request context
m.requestContext = CreateRequestContext(cbusMessage)
@@ -105,16 +91,16 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
switch currentByte {
case
- readwriteModel.ResponseTermination_CR,
- readwriteModel.ResponseTermination_LF:
+ readWriteModel.ResponseTermination_CR,
+ readWriteModel.ResponseTermination_LF:
return false
case
- byte(readwriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
- byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE):
+ byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
+ byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE):
confirmation = true
return false
default:
@@ -141,9 +127,9 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
}
// Check for an isolated error
- if bytes, err := ti.PeekReadableBytes(1); err == nil && (bytes[0] == byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE)) {
+ if bytes, err := ti.PeekReadableBytes(1); err == nil && (bytes[0] == byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE)) {
_, _ = ti.Read(1)
- return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
+ return readWriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
}
peekedBytes, err := ti.PeekReadableBytes(readableBytes)
@@ -155,12 +141,12 @@ lookingForTheEnd:
for i, peekedByte := range peekedBytes {
switch peekedByte {
case
- byte(readwriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
- byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
- byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE):
+ byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
+ byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
+ byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE):
if indexOfConfirmation < 0 {
indexOfConfirmation = i
}
@@ -245,7 +231,7 @@ lookingForTheEnd:
if foundErrors > m.currentlyReportedServerErrors {
log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
m.currentlyReportedServerErrors++
- return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
+ return readWriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
}
if foundErrors > 0 {
log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
@@ -272,12 +258,12 @@ lookingForTheEnd:
}
}
rb := utils.NewReadBufferByteBased(sanitizedInput)
- cBusMessage, err := readwriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
+ cBusMessage, err := readWriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
if err != nil {
log.Debug().Err(err).Msg("First Parse Failed")
{ // Try SAL
rb := utils.NewReadBufferByteBased(sanitizedInput)
- cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, pciResponse, readwriteModel.NewRequestContext(false), m.cbusOptions)
+ cBusMessage, secondErr := readWriteModel.CBusMessageParse(rb, pciResponse, readWriteModel.NewRequestContext(false), m.cbusOptions)
if secondErr == nil {
return cBusMessage, nil
} else {
@@ -285,10 +271,10 @@ lookingForTheEnd:
}
}
{ // Try MMI
- requestContext := readwriteModel.NewRequestContext(false)
- cbusOptions := readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
+ requestContext := readWriteModel.NewRequestContext(false)
+ cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
rb := utils.NewReadBufferByteBased(sanitizedInput)
- cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
+ cBusMessage, secondErr := readWriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
if secondErr == nil {
return cBusMessage, nil
} else {
@@ -302,3 +288,27 @@ lookingForTheEnd:
}
return cBusMessage, nil
}
+
+func extractMMIAndSAL(codec _default.DefaultCodecRequirements, message spi.Message) bool {
+ switch message := message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ switch reply := message.GetReply().(type) {
+ case readWriteModel.ReplyOrConfirmationReplyExactly:
+ switch reply := reply.GetReply().(type) {
+ case readWriteModel.ReplyEncodedReplyExactly:
+ switch encodedReply := reply.GetEncodedReply().(type) {
+ case readWriteModel.MonitoredSALReplyExactly:
+ codec.(*MessageCodec).monitoredSALs <- encodedReply.GetMonitoredSAL()
+ case readWriteModel.EncodedReplyCALReplyExactly:
+ calData := encodedReply.GetCalReply().GetCalData()
+ switch calData.(type) {
+ case readWriteModel.CALDataStatusExactly, readWriteModel.CALDataStatusExtendedExactly:
+ codec.(*MessageCodec).monitoredMMIs <- encodedReply.GetCalReply()
+ }
+ }
+ }
+ }
+ }
+ // We never handle mmi or sal here as we might want to read them in a read-request too
+ return false
+}