You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/02 13:33:04 UTC

[plc4x] 01/02: refactor(plc4go/cbus): split up reader into multiple methods

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fcd62ce597efa89e4064711b2ba30f1581995034
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 15:20:44 2023 +0200

    refactor(plc4go/cbus): split up reader into multiple methods
---
 plc4go/internal/cbus/Reader.go | 166 +++++++++++++++++++++--------------------
 1 file changed, 85 insertions(+), 81 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 1ab96bdfdc..64a2baa771 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -101,7 +101,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 			}
 			return
 		}
-		m.sendMessage(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
+		m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
 	}
 	readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
 	result <- &spiModel.DefaultPlcReadRequestResult{
@@ -110,94 +110,98 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 	}
 }
 
-func (m *Reader) sendMessage(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
 	transaction.Submit(func() {
-		// Send the  over the wire
-		log.Trace().Msg("Send ")
-		if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
-			cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-			if !ok {
-				return false
-			}
-			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-			if !ok {
-				return false
-			}
-			// Check if this errored
-			if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-				// This means we must handle this below
-				return true
-			}
+		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
+	})
+	if err := transaction.AwaitCompletion(ctx); err != nil {
+		log.Warn().Err(err).Msg("Error while awaiting completion")
+	}
+}
 
-			confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-			if !ok {
-				return false
-			}
-			return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
-		}, func(receivedMessage spi.Message) error {
-			defer func(transaction spi.RequestTransaction) {
-				// This is just to make sure we don't forget to close the transaction here
-				_ = transaction.EndRequest()
-			}(transaction)
-			// Convert the response into an
-			log.Trace().Msg("convert response to ")
-			cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-			messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
-			if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-				log.Trace().Msg("We got a server failure")
-				addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
-				return transaction.EndRequest()
-			}
-			replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-			if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-				var responseCode apiModel.PlcResponseCode
-				switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-					responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-					responseCode = apiModel.PlcResponseCode_INVALID_DATA
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-					responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-					responseCode = apiModel.PlcResponseCode_INVALID_DATA
-				default:
-					return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
-				}
-				log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
-				addResponseCode(tagName, responseCode)
-				return transaction.EndRequest()
-			}
+func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+	// Send the  over the wire
+	log.Trace().Msg("Send ")
+	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+		cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+		if !ok {
+			return false
+		}
+		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+		if !ok {
+			return false
+		}
+		// Check if this errored
+		if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+			// This means we must handle this below
+			return true
+		}
 
-			alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-			// TODO: it could be double confirmed but this is not implemented yet
-			embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-			if !ok {
-				log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-				addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
-				return transaction.EndRequest()
+		confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+		if !ok {
+			return false
+		}
+		return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+	}, func(receivedMessage spi.Message) error {
+		defer func(transaction spi.RequestTransaction) {
+			// This is just to make sure we don't forget to close the transaction here
+			_ = transaction.EndRequest()
+		}(transaction)
+		// Convert the response into an
+		log.Trace().Msg("convert response to ")
+		cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+		messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+			log.Trace().Msg("We got a server failure")
+			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+			return transaction.EndRequest()
+		}
+		replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+		if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+			var responseCode apiModel.PlcResponseCode
+			switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+				responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+				responseCode = apiModel.PlcResponseCode_INVALID_DATA
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+				responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+				responseCode = apiModel.PlcResponseCode_INVALID_DATA
+			default:
+				return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
 			}
+			log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+			addResponseCode(tagName, responseCode)
+			return transaction.EndRequest()
+		}
 
-			log.Trace().Msg("Handling confirmed data")
-			// TODO: check if we can use a plcValueSerializer
-			encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
-			if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
-				return errors.Wrap(err, "error encoding reply")
-			}
+		alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+		// TODO: it could be double confirmed but this is not implemented yet
+		embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+		if !ok {
+			log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+			addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
 			return transaction.EndRequest()
-		}, func(err error) error {
-			addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
-			return transaction.FailRequest(err)
-		}, time.Second*1); err != nil {
-			log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
-			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
-			if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
-				log.Debug().Err(err).Msg("Error failing request")
-			}
 		}
-	})
-	if err := transaction.AwaitCompletion(ctx); err != nil {
-		log.Warn().Err(err).Msg("Error while awaiting completion")
+
+		log.Trace().Msg("Handling confirmed data")
+		// TODO: check if we can use a plcValueSerializer
+		encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+		if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+			return errors.Wrap(err, "error encoding reply")
+		}
+		return transaction.EndRequest()
+	}, func(err error) error {
+		addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
+		return transaction.FailRequest(err)
+	}, time.Second*1); err != nil {
+		log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
+		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+		if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+			log.Debug().Err(err).Msg("Error failing request")
+		}
 	}
 }