You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/10 10:17:05 UTC

[plc4x] 02/02: fix(plc4go/cbus): change handling of error responses

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit aae82e92772b97fae0d7aa48555d1a1a0d8542fe
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 10 12:16:55 2022 +0200

    fix(plc4go/cbus): change handling of error responses
---
 plc4go/internal/cbus/MessageCodec.go               | 102 ++++++++++++++++-----
 .../knxnetip/readwrite/model/KnxManufacturer.go    |  26 +++++-
 2 files changed, 100 insertions(+), 28 deletions(-)

diff --git a/plc4go/internal/cbus/MessageCodec.go b/plc4go/internal/cbus/MessageCodec.go
index e83bcf42e..45ae578aa 100644
--- a/plc4go/internal/cbus/MessageCodec.go
+++ b/plc4go/internal/cbus/MessageCodec.go
@@ -40,6 +40,8 @@ type MessageCodec struct {
 	monitoredSALs   chan readwriteModel.MonitoredSAL
 	lastPackageHash uint32
 	hashEncountered uint
+
+	currentlyReportedServerErrors uint
 }
 
 func NewMessageCodec(transportInstance transports.TransportInstance) *MessageCodec {
@@ -99,26 +101,37 @@ func (m *MessageCodec) Receive() (spi.Message, error) {
 	log.Trace().Msg("receiving")
 
 	ti := m.GetTransportInstance()
-	if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
-		hitCr := currentByte == '\r'
-		if hitCr {
-			// Make sure we peek one more
-			_, _ = reader.Peek(1)
-			return false
+	// Fill the buffer
+	{
+		if err := ti.FillBuffer(func(_ uint, currentByte byte, reader *bufio.Reader) bool {
+			hitCr := currentByte == '\r'
+			if hitCr {
+				// Make sure we peek one more
+				_, _ = reader.Peek(1)
+				return false
+			}
+			return true
+		}); err != nil {
+			return nil, err
 		}
-		return true
-	}); err != nil {
-		return nil, err
 	}
-	readableBytes, err := ti.GetNumBytesAvailableInBuffer()
-	if err != nil {
-		log.Warn().Err(err).Msg("Got error reading")
-		return nil, nil
-	}
-	if readableBytes == 0 {
-		log.Trace().Msg("Nothing to read")
-		return nil, nil
+
+	// Check how many readable bytes we have
+	var readableBytes uint32
+	{
+		numBytesAvailableInBuffer, err := ti.GetNumBytesAvailableInBuffer()
+		if err != nil {
+			log.Warn().Err(err).Msg("Got error reading")
+			return nil, nil
+		}
+		if numBytesAvailableInBuffer == 0 {
+			log.Trace().Msg("Nothing to read")
+			return nil, nil
+		}
+		readableBytes = numBytesAvailableInBuffer
 	}
+
+	// Check for an isolated error
 	if bytes, err := ti.PeekReadableBytes(1); err != nil && (bytes[0] == '!') {
 		_, _ = ti.Read(1)
 		return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased(bytes), true, m.requestContext, m.cbusOptions)
@@ -179,6 +192,7 @@ lookingForTheEnd:
 		return nil, nil
 	}
 
+	// Build length
 	packetLength := indexOfCR + 1
 	if pciResponse {
 		packetLength = indexOfLF + 1
@@ -189,16 +203,58 @@ lookingForTheEnd:
 		panic("Invalid state... Can not be response and request at the same time")
 	}
 
-	read, err := ti.Read(uint32(packetLength))
-	if err != nil {
-		panic("Invalid state... If we have peeked that before we should be able to read that now")
+	// We need to ensure that there is no ! till the first /r
+	{
+		peekedBytes, err := ti.PeekReadableBytes(readableBytes)
+		if err != nil {
+			return nil, err
+		}
+		// We check in the current stream for reported errors
+		foundErrors := uint(0)
+		for _, peekedByte := range peekedBytes {
+			if peekedByte == '!' {
+				foundErrors++
+			}
+			if peekedByte == '\r' {
+				// We only look for errors within
+			}
+		}
+		// Now we report the errors one by one so for every request we get a proper rejection
+		if foundErrors > m.currentlyReportedServerErrors {
+			log.Debug().Msgf("We found %d errors in the current message. We have %d reported already", foundErrors, m.currentlyReportedServerErrors)
+			m.currentlyReportedServerErrors++
+			return readwriteModel.CBusMessageParse(utils.NewReadBufferByteBased([]byte{'!'}), true, m.requestContext, m.cbusOptions)
+		}
+		if foundErrors > 0 {
+			log.Debug().Msgf("We should have reported all errors by now (%d in total which we reported %d), so we resetting the count", foundErrors, m.currentlyReportedServerErrors)
+			m.currentlyReportedServerErrors = 0
+		}
+		log.Trace().Msgf("currentlyReportedServerErrors %d should be 0", m.currentlyReportedServerErrors)
+	}
+
+	var rawInput []byte
+	{
+		read, err := ti.Read(uint32(packetLength))
+		if err != nil {
+			panic("Invalid state... If we have peeked that before we should be able to read that now")
+		}
+		rawInput = read
+	}
+	var sanitizedInput []byte
+	// We remove every error marker we find
+	{
+		for _, b := range rawInput {
+			if b != '!' {
+				sanitizedInput = append(sanitizedInput, b)
+			}
+		}
 	}
-	rb := utils.NewReadBufferByteBased(read)
+	rb := utils.NewReadBufferByteBased(sanitizedInput)
 	cBusMessage, err := readwriteModel.CBusMessageParse(rb, pciResponse, m.requestContext, m.cbusOptions)
 	if err != nil {
 		log.Debug().Err(err).Msg("First Parse Failed")
 		{ // Try SAL
-			rb := utils.NewReadBufferByteBased(read)
+			rb := utils.NewReadBufferByteBased(sanitizedInput)
 			cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, pciResponse, readwriteModel.NewRequestContext(false), m.cbusOptions)
 			if secondErr == nil {
 				return cBusMessage, nil
@@ -209,7 +265,7 @@ lookingForTheEnd:
 		{ // Try MMI
 			requestContext := readwriteModel.NewRequestContext(false)
 			cbusOptions := readwriteModel.NewCBusOptions(false, false, false, false, false, false, false, false, false)
-			rb := utils.NewReadBufferByteBased(read)
+			rb := utils.NewReadBufferByteBased(sanitizedInput)
 			cBusMessage, secondErr := readwriteModel.CBusMessageParse(rb, true, requestContext, cbusOptions)
 			if secondErr == nil {
 				return cBusMessage, nil
diff --git a/plc4go/protocols/knxnetip/readwrite/model/KnxManufacturer.go b/plc4go/protocols/knxnetip/readwrite/model/KnxManufacturer.go
index f02aad83a..8ab5a790f 100644
--- a/plc4go/protocols/knxnetip/readwrite/model/KnxManufacturer.go
+++ b/plc4go/protocols/knxnetip/readwrite/model/KnxManufacturer.go
@@ -631,8 +631,9 @@ const (
 	KnxManufacturer_M_GORDIC                                             KnxManufacturer = 591
 	KnxManufacturer_M_DELTA_ELECTRONICS                                  KnxManufacturer = 592
 	KnxManufacturer_M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_     KnxManufacturer = 593
-	KnxManufacturer_M_ABB___RESERVED                                     KnxManufacturer = 594
-	KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO___RESERVED                    KnxManufacturer = 595
+	KnxManufacturer_M_KG_POWER                                           KnxManufacturer = 594
+	KnxManufacturer_M_ABB___RESERVED                                     KnxManufacturer = 595
+	KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO___RESERVED                    KnxManufacturer = 596
 )
 
 var KnxManufacturerValues []KnxManufacturer
@@ -1234,6 +1235,7 @@ func init() {
 		KnxManufacturer_M_GORDIC,
 		KnxManufacturer_M_DELTA_ELECTRONICS,
 		KnxManufacturer_M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_,
+		KnxManufacturer_M_KG_POWER,
 		KnxManufacturer_M_ABB___RESERVED,
 		KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO___RESERVED,
 	}
@@ -3443,10 +3445,14 @@ func (e KnxManufacturer) Number() uint16 {
 		}
 	case 594:
 		{ /* '594' */
-			return 43954
+			return 652
 		}
 	case 595:
 		{ /* '595' */
+			return 43954
+		}
+	case 596:
+		{ /* '596' */
 			return 43959
 		}
 	case 6:
@@ -5845,10 +5851,14 @@ func (e KnxManufacturer) Name() string {
 		}
 	case 594:
 		{ /* '594' */
-			return "ABB - reserved"
+			return "KG-POWER"
 		}
 	case 595:
 		{ /* '595' */
+			return "ABB - reserved"
+		}
+	case 596:
+		{ /* '596' */
 			return "Busch-Jaeger Elektro - reserved"
 		}
 	case 6:
@@ -7145,8 +7155,10 @@ func KnxManufacturerByValue(value uint16) (enum KnxManufacturer, ok bool) {
 	case 593:
 		return KnxManufacturer_M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_, true
 	case 594:
-		return KnxManufacturer_M_ABB___RESERVED, true
+		return KnxManufacturer_M_KG_POWER, true
 	case 595:
+		return KnxManufacturer_M_ABB___RESERVED, true
+	case 596:
 		return KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO___RESERVED, true
 	case 6:
 		return KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO, true
@@ -8342,6 +8354,8 @@ func KnxManufacturerByName(value string) (enum KnxManufacturer, ok bool) {
 		return KnxManufacturer_M_DELTA_ELECTRONICS, true
 	case "M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_":
 		return KnxManufacturer_M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_, true
+	case "M_KG_POWER":
+		return KnxManufacturer_M_KG_POWER, true
 	case "M_ABB___RESERVED":
 		return KnxManufacturer_M_ABB___RESERVED, true
 	case "M_BUSCH_JAEGER_ELEKTRO___RESERVED":
@@ -9585,6 +9599,8 @@ func (e KnxManufacturer) PLC4XEnumName() string {
 		return "M_DELTA_ELECTRONICS"
 	case KnxManufacturer_M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_:
 		return "M_SHANGHAI_LEWIN_INTELLIGENT_TECHNOLOGY_CO__LTD_"
+	case KnxManufacturer_M_KG_POWER:
+		return "M_KG_POWER"
 	case KnxManufacturer_M_ABB___RESERVED:
 		return "M_ABB___RESERVED"
 	case KnxManufacturer_M_BUSCH_JAEGER_ELEKTRO___RESERVED: