You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 15:05:37 UTC

[plc4x] branch develop updated: refactor(plc4go/cbus): move MMI handling to subscription handling

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6bd288c47 refactor(plc4go/cbus): move MMI handling to subscription handling
6bd288c47 is described below

commit 6bd288c47fc5dda23c503ba18510885dadb8a2fd
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 17:05:31 2022 +0200

    refactor(plc4go/cbus): move MMI handling to subscription handling
    
    - don't handle messages so we can react in a read to it
---
 plc4go/internal/cbus/Connection.go   |  48 +++++-----------
 plc4go/internal/cbus/MessageCodec.go | 104 +++++++++++++++++++----------------
 2 files changed, 72 insertions(+), 80 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 96c0ebfff..f339eed62 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -206,55 +206,37 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 	}
 	c.fireConnected(ch)
 	c.startSubscriptionHandler()
-	c.startDefaultIncomingMessageHandler()
 }
 
-func (c *Connection) startDefaultIncomingMessageHandler() {
-	log.Debug().Msg("Starting default incoming message handler")
+func (c *Connection) startSubscriptionHandler() {
+	log.Debug().Msg("Starting SAL handler")
 	go func() {
-		log.Debug().Msg("default incoming message handler started")
+		log.Debug().Msg("SAL handler stated")
 		for c.IsConnected() {
-			for message := range c.messageCodec.GetDefaultIncomingMessageChannel() {
-				switch message := message.(type) {
-				case readWriteModel.CBusMessageToClientExactly:
-					switch reply := message.GetReply().(type) {
-					case readWriteModel.ReplyOrConfirmationReplyExactly:
-						switch reply := reply.GetReply().(type) {
-						case readWriteModel.ReplyEncodedReplyExactly:
-							switch encodedReply := reply.GetEncodedReply().(type) {
-							case readWriteModel.EncodedReplyCALReplyExactly:
-								for _, subscriber := range c.subscribers {
-									calReply := encodedReply.GetCalReply()
-									if ok := subscriber.handleMonitoredMMI(calReply); ok {
-										log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
-										continue
-									}
-								}
-							}
-						}
+			for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+				for _, subscriber := range c.subscribers {
+					if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
+						log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
 					}
 				}
-				log.Debug().Msgf("Received unhandled \n%v", message)
 			}
 		}
-		log.Info().Msg("Ending default incoming message handler")
+		log.Info().Msg("Ending SAL handler")
 	}()
-}
-
-func (c *Connection) startSubscriptionHandler() {
-	log.Debug().Msg("Starting subscription handler")
+	log.Debug().Msg("Starting MMI handler")
 	go func() {
-		log.Debug().Msg("Subscription handler stated")
+		log.Debug().Msg("default MMI started")
 		for c.IsConnected() {
-			for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+			for calReply := range c.messageCodec.(*MessageCodec).monitoredMMIs {
 				for _, subscriber := range c.subscribers {
-					if ok := subscriber.handleMonitoredSal(monitoredSal); ok {
-						log.Debug().Msgf("%v handled\n%s", subscriber, monitoredSal)
+					if ok := subscriber.handleMonitoredMMI(calReply); ok {
+						log.Debug().Msgf("%v handled\n%s", subscriber, calReply)
+						continue
 					}
 				}
 			}
 		}
-		log.Info().Msg("Ending subscription handler")
+		log.Info().Msg("Ending MMI handler")
 	}()
 }
 
diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index 0c81027a6..ee5ed3a36 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -21,7 +21,7 @@ package cbus
 
 import (
 	"bufio"
-	readwriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/transports"
@@ -34,10 +34,11 @@ import (
 type MessageCodec struct {
 	_default.DefaultCodec
 
-	requestContext readwriteModel.RequestContext
-	cbusOptions    readwriteModel.CBusOptions
+	requestContext readWriteModel.RequestContext
+	cbusOptions    readWriteModel.CBusOptions
 
-	monitoredSALs   chan readwriteModel.MonitoredSAL
+	monitoredMMIs   chan readWriteModel.CALReply
+	monitoredSALs   chan readWriteModel.MonitoredSAL
 	lastPackageHash uint32
 	hashEncountered uint
 
@@ -46,27 +47,12 @@ type MessageCodec struct {
 
 func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
 	codec := &MessageCodec{
-		requestContext: readwriteModel.NewRequestContext(false),
-		cbusOptions:    readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false),
-		monitoredSALs:  make(chan readwriteModel.MonitoredSAL, 100),
+		requestContext: readWriteModel.NewRequestContext(false),
+		cbusOptions:    readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false),
+		monitoredMMIs:  make(chan readWriteModel.CALReply, 100),
+		monitoredSALs:  make(chan readWriteModel.MonitoredSAL, 100),
 	}
-	codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(func(codec _default.DefaultCodecRequirements, message spi.Message) bool {
-		switch message := message.(type) {
-		case readwriteModel.CBusMessageToClientExactly:
-			switch reply := message.GetReply().(type) {
-			case readwriteModel.ReplyOrConfirmationReplyExactly:
-				switch reply := reply.GetReply().(type) {
-				case readwriteModel.ReplyEncodedReplyExactly:
-					switch encodedReply := reply.GetEncodedReply().(type) {
-					case readwriteModel.MonitoredSALReplyExactly:
-						codec.(*MessageCodec).monitoredSALs <- encodedReply.GetMonitoredSAL()
-						return true
-					}
-				}
-			}
-		}
-		return false
-	}))
+	codec.DefaultCodec = _default.NewDefaultCodec(codec, transportInstance, _default.WithCustomMessageHandler(extractMMIAndSAL))
 	return codec
 }
 
@@ -77,7 +63,7 @@ func (m *MessageCodec) GetCodec() spi.MessageCodec {
 func (m *MessageCodec) Send(message spi.Message) error {
 	log.Trace().Msg("Sending message")
 	// Cast the message to the correct type of struct
-	cbusMessage := message.(readwriteModel.CBusMessage)
+	cbusMessage := message.(readWriteModel.CBusMessage)
 
 	// Set the right request context
 	m.requestContext = CreateRequestContext(cbusMessage)
@@ -105,16 +91,16 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 		if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
 			switch currentByte {
 			case
-				readwriteModel.ResponseTermination_CR,
-				readwriteModel.ResponseTermination_LF:
+				readWriteModel.ResponseTermination_CR,
+				readWriteModel.ResponseTermination_LF:
 				return false
 			case
-				byte(readwriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
-				byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
-				byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
-				byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
-				byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
-				byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE):
+				byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
+				byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
+				byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
+				byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
+				byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
+				byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE):
 				confirmation = true
 				return false
 			default:
@@ -141,9 +127,9 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	}
 
 	// Check for an isolated error
-	if bytes, err := ti.PeekReadableBytes(1); err == nil && (bytes[0] == byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE)) {
+	if bytes, err := ti.PeekReadableBytes(1); err == nil && (bytes[0] == byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE)) {
 		_, _ = ti.Read(1)
-		return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
+		return readWriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
 	}
 
 	peekedBytes, err := ti.PeekReadableBytes(readableBytes)
@@ -155,12 +141,12 @@ lookingForTheEnd:
 	for i, peekedByte := range peekedBytes {
 		switch peekedByte {
 		case
-			byte(readwriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
-			byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
-			byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
-			byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
-			byte(readwriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
-			byte(readwriteModel.ConfirmationType_CHECKSUM_FAILURE):
+			byte(readWriteModel.ConfirmationType_CONFIRMATION_SUCCESSFUL),
+			byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS),
+			byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION),
+			byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS),
+			byte(readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG),
+			byte(readWriteModel.ConfirmationType_CHECKSUM_FAILURE):
 			if indexOfConfirmation < 0 {
 				indexOfConfirmation = i
 			}
@@ -245,7 +231,7 @@ lookingForTheEnd:
 		if foundErrors > m.currentlyReportedServerErrors {
 			log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
 			m.currentlyReportedServerErrors++
-			return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
+			return readWriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
 		}
 		if foundErrors > 0 {
 			log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
@@ -272,12 +258,12 @@ lookingForTheEnd:
 		}
 	}
 	rb := utils.NewReadBufferByteBased(sanitizedInput)
-	cBusMessage, err := readwriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
+	cBusMessage, err := readWriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
 	if err != nil {
 		log.Debug().Err(err).Msg("First Parse Failed")
 		{ // Try SAL
 			rb := utils.NewReadBufferByteBased(sanitizedInput)
-			cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, pciResponse, readwriteModel.NewRequestContext(false), m.cbusOptions)
+			cBusMessage, secondErr := readWriteModel.CBusMessageParse(rb, pciResponse, readWriteModel.NewRequestContext(false), m.cbusOptions)
 			if secondErr == nil {
 				return cBusMessage, nil
 			} else {
@@ -285,10 +271,10 @@ lookingForTheEnd:
 			}
 		}
 		{ // Try MMI
-			requestContext := readwriteModel.NewRequestContext(false)
-			cbusOptions := readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
+			requestContext := readWriteModel.NewRequestContext(false)
+			cbusOptions := readWriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
 			rb := utils.NewReadBufferByteBased(sanitizedInput)
-			cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
+			cBusMessage, secondErr := readWriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
 			if secondErr == nil {
 				return cBusMessage, nil
 			} else {
@@ -302,3 +288,27 @@ lookingForTheEnd:
 	}
 	return cBusMessage, nil
 }
+
+func extractMMIAndSAL(codec _default.DefaultCodecRequirements, message spi.Message) bool {
+	switch message := message.(type) {
+	case readWriteModel.CBusMessageToClientExactly:
+		switch reply := message.GetReply().(type) {
+		case readWriteModel.ReplyOrConfirmationReplyExactly:
+			switch reply := reply.GetReply().(type) {
+			case readWriteModel.ReplyEncodedReplyExactly:
+				switch encodedReply := reply.GetEncodedReply().(type) {
+				case readWriteModel.MonitoredSALReplyExactly:
+					codec.(*MessageCodec).monitoredSALs <- encodedReply.GetMonitoredSAL()
+				case readWriteModel.EncodedReplyCALReplyExactly:
+					calData := encodedReply.GetCalReply().GetCalData()
+					switch calData.(type) {
+					case readWriteModel.CALDataStatusExactly, readWriteModel.CALDataStatusExtendedExactly:
+						codec.(*MessageCodec).monitoredMMIs <- encodedReply.GetCalReply()
+					}
+				}
+			}
+		}
+	}
+	// We never handle mmi or sal here as we might want to read them in a read-request too
+	return false
+}