You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/02 12:44:56 UTC

[plc4x] branch develop updated: test(plc4go/cbus): more tests for Reader

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new df293b2684 test(plc4go/cbus): more tests for Reader
df293b2684 is described below

commit df293b268459561445951e5ca64c7256784f98f3
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 14:44:48 2023 +0200

    test(plc4go/cbus): more tests for Reader
---
 plc4go/internal/cbus/Reader.go      | 202 ++++++++++++++++++------------------
 plc4go/internal/cbus/Reader_test.go | 103 ++++++++++++++++++
 2 files changed, 202 insertions(+), 103 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 41c59e6ee2..1ab96bdfdc 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -58,9 +58,8 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 	numTags := len(readRequest.GetTagNames())
 	if numTags > 20 { // letters g-z
 		result <- &spiModel.DefaultPlcReadRequestResult{
-			Request:  readRequest,
-			Response: nil,
-			Err:      errors.New("Only 20 tags can be handled at once"),
+			Request: readRequest,
+			Err:     errors.New("Only 20 tags can be handled at once"),
 		}
 		return
 	}
@@ -68,21 +67,15 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 	for _, tagName := range readRequest.GetTagNames() {
 		tag := readRequest.GetTag(tagName)
 		message, supportsRead, _, _, err := TagToCBusMessage(tag, nil, m.alphaGenerator, m.messageCodec)
-		if !supportsRead {
+		switch {
+		case err != nil:
 			result <- &spiModel.DefaultPlcReadRequestResult{
-				Request:  readRequest,
-				Response: nil,
-				Err:      errors.Wrapf(err, "Error encoding cbus message for tag %s. Tag is not meant to be read.", tagName),
-			}
-			return
-		}
-		if err != nil {
-			result <- &spiModel.DefaultPlcReadRequestResult{
-				Request:  readRequest,
-				Response: nil,
-				Err:      errors.Wrapf(err, "Error encoding cbus message for tag %s", tagName),
+				Request: readRequest,
+				Err:     errors.Wrapf(err, "Error encoding cbus message for tag %s", tagName),
 			}
 			return
+		case !supportsRead: // Note this should not be reachable
+			panic("this should not be possible as we always should then get the error above")
 		}
 		messages[tagName] = message
 	}
@@ -108,100 +101,103 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 			}
 			return
 		}
-		tagNameCopy := tagName
-		// Start a new request-transaction (Is ended in the response-handler)
-		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
-			// Send the  over the wire
-			log.Trace().Msg("Send ")
-			if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
-				cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-				if !ok {
-					return false
-				}
-				messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-				if !ok {
-					return false
-				}
-				// Check if this errored
-				if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-					// This means we must handle this below
-					return true
-				}
+		m.sendMessage(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
+	}
+	readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
+	result <- &spiModel.DefaultPlcReadRequestResult{
+		Request:  readRequest,
+		Response: readResponse,
+	}
+}
 
-				confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-				if !ok {
-					return false
-				}
-				return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
-			}, func(receivedMessage spi.Message) error {
-				defer func(transaction spi.RequestTransaction) {
-					// This is just to make sure we don't forget to close the transaction here
-					_ = transaction.EndRequest()
-				}(transaction)
-				// Convert the response into an
-				log.Trace().Msg("convert response to ")
-				cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-				messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
-				if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-					log.Trace().Msg("We got a server failure")
-					addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INVALID_DATA)
-					return transaction.EndRequest()
-				}
-				replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-				if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-					var responseCode apiModel.PlcResponseCode
-					switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-					case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-						responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
-					case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-						responseCode = apiModel.PlcResponseCode_INVALID_DATA
-					case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-						responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
-					case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-						responseCode = apiModel.PlcResponseCode_INVALID_DATA
-					default:
-						return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
-					}
-					log.Trace().Msgf("Was no success %s:%v", tagNameCopy, responseCode)
-					addResponseCode(tagNameCopy, responseCode)
-					return transaction.EndRequest()
-				}
+func (m *Reader) sendMessage(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+	// Start a new request-transaction (Is ended in the response-handler)
+	transaction := m.tm.StartTransaction()
+	transaction.Submit(func() {
+		// Send the  over the wire
+		log.Trace().Msg("Send ")
+		if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+			cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+			if !ok {
+				return false
+			}
+			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+			if !ok {
+				return false
+			}
+			// Check if this errored
+			if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				// This means we must handle this below
+				return true
+			}
 
-				alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-				// TODO: it could be double confirmed but this is not implemented yet
-				embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-				if !ok {
-					log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-					addResponseCode(tagNameCopy, apiModel.PlcResponseCode_NOT_FOUND)
-					return transaction.EndRequest()
+			confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !ok {
+				return false
+			}
+			return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+		}, func(receivedMessage spi.Message) error {
+			defer func(transaction spi.RequestTransaction) {
+				// This is just to make sure we don't forget to close the transaction here
+				_ = transaction.EndRequest()
+			}(transaction)
+			// Convert the response into an
+			log.Trace().Msg("convert response to ")
+			cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+			messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+			if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+				log.Trace().Msg("We got a server failure")
+				addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+				return transaction.EndRequest()
+			}
+			replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+			if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+				var responseCode apiModel.PlcResponseCode
+				switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+					responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+					responseCode = apiModel.PlcResponseCode_INVALID_DATA
+				default:
+					return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
 				}
+				log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+				addResponseCode(tagName, responseCode)
+				return transaction.EndRequest()
+			}
 
-				log.Trace().Msg("Handling confirmed data")
-				// TODO: check if we can use a plcValueSerializer
-				encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
-				if err := MapEncodedReply(transaction, encodedReply, tagNameCopy, addResponseCode, addPlcValue); err != nil {
-					return errors.Wrap(err, "error encoding reply")
-				}
+			alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+			// TODO: it could be double confirmed but this is not implemented yet
+			embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+			if !ok {
+				log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+				addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
 				return transaction.EndRequest()
-			}, func(err error) error {
-				addResponseCode(tagNameCopy, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
-				return transaction.FailRequest(err)
-			}, time.Second*1); err != nil {
-				log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
-				addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
-				if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
-					log.Debug().Err(err).Msg("Error failing request")
-				}
 			}
-		})
-		if err := transaction.AwaitCompletion(ctx); err != nil {
-			log.Warn().Err(err).Msg("Error while awaiting completion")
+
+			log.Trace().Msg("Handling confirmed data")
+			// TODO: check if we can use a plcValueSerializer
+			encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+			if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+				return errors.Wrap(err, "error encoding reply")
+			}
+			return transaction.EndRequest()
+		}, func(err error) error {
+			addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
+			return transaction.FailRequest(err)
+		}, time.Second*1); err != nil {
+			log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
+			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+				log.Debug().Err(err).Msg("Error failing request")
+			}
 		}
-	}
-	readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
-	result <- &spiModel.DefaultPlcReadRequestResult{
-		Request:  readRequest,
-		Response: readResponse,
+	})
+	if err := transaction.AwaitCompletion(ctx); err != nil {
+		log.Warn().Err(err).Msg("Error while awaiting completion")
 	}
 }
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index fdcbe24ab9..7bd2932a08 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -148,6 +148,55 @@ func TestReader_readSync(t *testing.T) {
 				return true
 			},
 		},
+		{
+			name: "unmapped tag",
+			fields: fields{
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					codec := NewMessageCodec(transportInstance)
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+				tm: spi.NewRequestTransactionManager(10),
+			},
+			args: args{
+				ctx: context.Background(),
+				readRequest: spiModel.NewDefaultPlcReadRequest(
+					map[string]apiModel.PlcTag{
+						"asd": nil,
+					},
+					[]string{
+						"asd",
+					},
+					nil,
+					nil,
+				),
+				result: make(chan apiModel.PlcReadRequestResult, 1),
+			},
+			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
+				timer := time.NewTimer(2 * time.Second)
+				defer timer.Stop()
+				select {
+				case <-timer.C:
+					t.Fail()
+				case result := <-results:
+					assert.NotNil(t, result.GetErr())
+				}
+				return true
+			},
+		},
 		{
 			name: "read something without any tag",
 			args: args{
@@ -249,6 +298,60 @@ func TestReader_readSync(t *testing.T) {
 				return true
 			},
 		},
+		{
+			name: "read identify type aborted",
+			fields: fields{
+				alphaGenerator: &AlphaGenerator{currentAlpha: 'g'},
+				messageCodec: func() *MessageCodec {
+					transport := test.NewTransport()
+					transportUrl := url.URL{Scheme: "test"}
+					transportInstance, err := transport.CreateTransportInstance(transportUrl, nil)
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					codec := NewMessageCodec(transportInstance)
+					err = codec.Connect()
+					if err != nil {
+						t.Error(err)
+						t.FailNow()
+						return nil
+					}
+					return codec
+				}(),
+				tm: spi.NewRequestTransactionManager(10),
+			},
+			args: args{
+				ctx: func() context.Context {
+					timeout, cancel := context.WithCancel(context.Background())
+					cancel()
+					return timeout
+				}(),
+				readRequest: spiModel.NewDefaultPlcReadRequest(
+					map[string]apiModel.PlcTag{
+						"blub": NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Type, 1),
+					},
+					[]string{
+						"blub",
+					},
+					nil,
+					nil,
+				),
+				result: make(chan apiModel.PlcReadRequestResult, 1),
+			},
+			resultEvaluator: func(t *testing.T, results chan apiModel.PlcReadRequestResult) bool {
+				timer := time.NewTimer(2 * time.Second)
+				defer timer.Stop()
+				select {
+				case <-timer.C:
+					t.Fail()
+				case result := <-results:
+					assert.NotNil(t, result.GetErr())
+				}
+				return true
+			},
+		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {