You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/02 13:33:04 UTC
[plc4x] 01/02: refactor(plc4go/cbus): split up reader into multiple methods
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit fcd62ce597efa89e4064711b2ba30f1581995034
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 15:20:44 2023 +0200
refactor(plc4go/cbus): split up reader into multiple methods
---
plc4go/internal/cbus/Reader.go | 166 +++++++++++++++++++++--------------------
1 file changed, 85 insertions(+), 81 deletions(-)
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 1ab96bdfdc..64a2baa771 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -101,7 +101,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
}
return
}
- m.sendMessage(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
+ m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
}
readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
result <- &spiModel.DefaultPlcReadRequestResult{
@@ -110,94 +110,98 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
}
}
-func (m *Reader) sendMessage(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
// Start a new request-transaction (Is ended in the response-handler)
transaction := m.tm.StartTransaction()
transaction.Submit(func() {
- // Send the over the wire
- log.Trace().Msg("Send ")
- if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
- cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
- if !ok {
- return false
- }
- messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
- if !ok {
- return false
- }
- // Check if this errored
- if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- // This means we must handle this below
- return true
- }
+ m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
+ })
+ if err := transaction.AwaitCompletion(ctx); err != nil {
+ log.Warn().Err(err).Msg("Error while awaiting completion")
+ }
+}
- confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !ok {
- return false
- }
- return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
- }, func(receivedMessage spi.Message) error {
- defer func(transaction spi.RequestTransaction) {
- // This is just to make sure we don't forget to close the transaction here
- _ = transaction.EndRequest()
- }(transaction)
- // Convert the response into an
- log.Trace().Msg("convert response to ")
- cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
- messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
- if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
- log.Trace().Msg("We got a server failure")
- addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
- return transaction.EndRequest()
- }
- replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
- if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
- var responseCode apiModel.PlcResponseCode
- switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
- responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
- responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
- case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
- responseCode = apiModel.PlcResponseCode_INVALID_DATA
- default:
- return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
- }
- log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
- addResponseCode(tagName, responseCode)
- return transaction.EndRequest()
- }
+func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+ // Send the over the wire
+ log.Trace().Msg("Send ")
+ if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+ cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+ if !ok {
+ return false
+ }
+ messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+ if !ok {
+ return false
+ }
+ // Check if this errored
+ if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ // This means we must handle this below
+ return true
+ }
- alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
- // TODO: it could be double confirmed but this is not implemented yet
- embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
- if !ok {
- log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
- addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
- return transaction.EndRequest()
+ confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !ok {
+ return false
+ }
+ return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+ }, func(receivedMessage spi.Message) error {
+ defer func(transaction spi.RequestTransaction) {
+ // This is just to make sure we don't forget to close the transaction here
+ _ = transaction.EndRequest()
+ }(transaction)
+ // Convert the response into an
+ log.Trace().Msg("convert response to ")
+ cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+ messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+ if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+ log.Trace().Msg("We got a server failure")
+ addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+ return transaction.EndRequest()
+ }
+ replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+ if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+ var responseCode apiModel.PlcResponseCode
+ switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+ responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+ case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+ responseCode = apiModel.PlcResponseCode_INVALID_DATA
+ default:
+ return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
}
+ log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+ addResponseCode(tagName, responseCode)
+ return transaction.EndRequest()
+ }
- log.Trace().Msg("Handling confirmed data")
- // TODO: check if we can use a plcValueSerializer
- encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
- if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
- return errors.Wrap(err, "error encoding reply")
- }
+ alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+ // TODO: it could be double confirmed but this is not implemented yet
+ embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+ if !ok {
+ log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+ addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
return transaction.EndRequest()
- }, func(err error) error {
- addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
- return transaction.FailRequest(err)
- }, time.Second*1); err != nil {
- log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
- addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
- if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
- log.Debug().Err(err).Msg("Error failing request")
- }
}
- })
- if err := transaction.AwaitCompletion(ctx); err != nil {
- log.Warn().Err(err).Msg("Error while awaiting completion")
+
+ log.Trace().Msg("Handling confirmed data")
+ // TODO: check if we can use a plcValueSerializer
+ encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+ if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+ return errors.Wrap(err, "error encoding reply")
+ }
+ return transaction.EndRequest()
+ }, func(err error) error {
+ addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
+ return transaction.FailRequest(err)
+ }, time.Second*1); err != nil {
+ log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
+ addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+ if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+ log.Debug().Err(err).Msg("Error failing request")
+ }
}
}