You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/04 12:37:35 UTC

[plc4x] 02/03: feat(plc4xpcapanalyzer/cbus): echo in merge requests should now be discarded so numbering won't get messed up

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c8893e012624bc05192445d6d67e7df1e218e806
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 4 13:29:34 2022 +0200

    feat(plc4xpcapanalyzer/cbus): echo in merge requests should now be discarded so numbering won't get messed up
---
 .../internal/analyzer/analyzer.go                  |   2 +
 .../internal/cbusanalyzer/analyzer.go              | 126 +++++++++++++--------
 .../plc4xpcapanalyzer/internal/common/common.go    |   4 +
 3 files changed, 85 insertions(+), 47 deletions(-)

diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
index c53c06191..17b9f147b 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
@@ -126,6 +126,8 @@ func Analyze(pcapFile, protocolType string) {
 				log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is unterminated", realPacketNumber)
 			case common.ErrEmptyPackage:
 				log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is empty", realPacketNumber)
+			case common.ErrEcho:
+				log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is echo", realPacketNumber)
 			default:
 				parseFails++
 				// TODO: write report to xml or something
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
index f6573b92f..5753427ea 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
@@ -28,6 +28,7 @@ import (
 	"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/internal/common"
 	"github.com/google/gopacket"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
 	"net"
 	"reflect"
@@ -41,6 +42,9 @@ type Analyzer struct {
 	currentInboundPayloads          map[string][]byte
 	currentPrefilterInboundPayloads map[string][]byte
 	mappedPacketChan                chan gopacket.Packet
+
+	lastParsePayload []byte
+	lastMapPayload   []byte
 }
 
 func (a *Analyzer) Init() {
@@ -76,14 +80,26 @@ func (a *Analyzer) PackageParse(packetInformation common.PacketInformation, payl
 			true,
 		)
 	}
-	currentPayload, err := a.getCurrentPayload(packetInformation, payload, isResponse, true, false)
+	mergeCallback := func(index int) {
+		log.Warn().Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d", index)
+	}
+	currentPayload, err := a.getCurrentPayload(packetInformation, payload, mergeCallback, a.currentInboundPayloads, &a.lastParsePayload)
 	if err != nil {
 		return nil, err
 	}
-	// TODO: apparently we only do crc on receive with our tests so we need to implement that
+	if reflect.DeepEqual(currentPayload, a.lastParsePayload) {
+		return nil, common.ErrEcho
+	}
+	a.lastParsePayload = currentPayload
 	parse, err := model.CBusMessageParse(utils.NewReadBufferByteBased(currentPayload), isResponse, a.requestContext, cBusOptions)
 	if err != nil {
-		return nil, errors.Wrap(err, "Error parsing CBusCommand")
+		if secondParse, err := model.CBusMessageParse(utils.NewReadBufferByteBased(currentPayload), isResponse, model.NewRequestContext(false, false, false), model.NewCBusOptions(false, false, false, false, false, false, false, false, false)); err != nil {
+			log.Debug().Err(err).Msg("Second parse failed too")
+			return nil, errors.Wrap(err, "Error parsing CBusCommand")
+		} else {
+			log.Warn().Stringer("packetInformation", packetInformation).Msgf("package got overridden by second parse... probably a MMI\n%s", secondParse)
+			parse = secondParse
+		}
 	}
 	a.requestContext = cbus.CreateRequestContextWithInfoCallback(parse, func(infoString string) {
 		log.Debug().Msgf("No.[%d] %s", packetInformation.PacketNumber, infoString)
@@ -98,64 +114,33 @@ func (a *Analyzer) isResponse(packetInformation common.PacketInformation) bool {
 	return isResponse
 }
 
-func (a *Analyzer) getCurrentPayload(packetInformation common.PacketInformation, payload []byte, isResponse bool, warnAboutSplit bool, usePrefilterPayloads bool) ([]byte, error) {
+func (a *Analyzer) getCurrentPayload(packetInformation common.PacketInformation, payload []byte, mergeCallback func(int), currentInboundPayloads map[string][]byte, lastPayload *[]byte) ([]byte, error) {
 	srcUip := packetInformation.SrcIp.String()
 	payload = filterXOnXOff(payload)
 	if len(payload) == 0 {
 		return nil, common.ErrEmptyPackage
 	}
 	// Check if we have a termination in the middle
-	currentInboundPayloads := a.currentInboundPayloads
-	if usePrefilterPayloads {
-		currentInboundPayloads = a.currentPrefilterInboundPayloads
-	}
 	currentPayload := currentInboundPayloads[srcUip]
-	currentPayload = append(currentPayload, payload...)
-	shouldClearInboundPayload := true
-	isMergedMessage := false
-	// Check if we have a merged message
-mergeCheck:
-	for i, b := range currentPayload {
-		if i == 0 {
-			// TODO: we ignore the first byte as this is typical for reset etc... so maybe this is good or bad we will see
-			continue
-		}
-		switch b {
-		case 0x0D:
-			if i+1 < len(currentPayload) && currentPayload[i+1] == 0x0A {
-				// If we know the next is a newline we jump to that index...
-				i++
-			}
-			// ... other than that the logic is the same
-			fallthrough
-		case 0x0A:
-			// We have a merged message if we are not at the end
-			if i < len(currentPayload)-1 {
-				event := log.Warn()
-				if !warnAboutSplit {
-					event = log.Debug()
-				}
-				event.Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d (usePrefilterPayloads=%t)", i, usePrefilterPayloads)
-				// In this case we need to put the tail into our "buffer"
-				currentInboundPayloads[srcUip] = currentPayload[i+1:]
-				// and use the beginning as current payload
-				currentPayload = currentPayload[:i+1]
-				shouldClearInboundPayload = false
-				isMergedMessage = true
-				break mergeCheck
-			}
-		}
+	if currentPayload != nil {
+		log.Debug().Func(func(e *zerolog.Event) {
+			e.Msgf("Prepending current payload %+q to actual payload %+q: %+q", currentPayload, payload, append(currentPayload, payload...))
+		})
+		currentPayload = append(currentPayload, payload...)
+	} else {
+		currentPayload = payload
 	}
+	isMergedMessage, shouldClearInboundPayload := mergeCheck(&currentPayload, srcUip, mergeCallback, currentInboundPayloads, lastPayload)
 	if !isMergedMessage {
 		// When we have a merge message we already set the current payload to the tail
 		currentInboundPayloads[srcUip] = currentPayload
 	} else {
 		log.Debug().Stringer("packetInformation", packetInformation).Msgf("Remainder %+q", currentInboundPayloads[srcUip])
 	}
-	if lastElement := currentPayload[len(currentPayload)-1]; (!isResponse /*a request must end with cr*/ && lastElement != 0x0D /*cr*/) || (isResponse /*a response must end with lf*/ && lastElement != 0x0A /*lf*/) {
+	if lastElement := currentPayload[len(currentPayload)-1]; (lastElement != '\r') && (lastElement != '\n') {
 		return nil, common.ErrUnterminatedPackage
 	} else {
-		log.Debug().Msgf("Last element 0x%x", lastElement)
+		log.Debug().Stringer("packetInformation", packetInformation).Msgf("Last element 0x%x", lastElement)
 		if shouldClearInboundPayload {
 			if currentSavedPayload := currentInboundPayloads[srcUip]; currentSavedPayload != nil {
 				// We remove our current payload from the beginning of the cache
@@ -172,12 +157,55 @@ mergeCheck:
 	return currentPayload, nil
 }
 
+func mergeCheck(currentPayload *[]byte, srcUip string, mergeCallback func(int), currentInboundPayloads map[string][]byte, lastPayload *[]byte) (isMergedMessage, shouldClearInboundPayload bool) {
+	// Check if we have a merged message
+	for i, b := range *currentPayload {
+		if i == 0 {
+			// we ignore the first byte as this is typical for reset etc... so maybe this is good or bad we will see
+			continue
+		}
+		switch b {
+		case 0x0D:
+			if i+1 < len(*currentPayload) && (*currentPayload)[i+1] == 0x0A {
+				// If we know the next is a newline we jump to that index...
+				i++
+			}
+			// ... other than that the logic is the same
+			fallthrough
+		case 0x0A:
+			// We have a merged message if we are not at the end
+			if i < len(*currentPayload)-1 {
+				headPayload := (*currentPayload)[:i+1]
+				tailPayload := (*currentPayload)[i+1:]
+				if reflect.DeepEqual(headPayload, *lastPayload) {
+					// This means that we have a merge where the last payload is an echo. In that case we discard that here to not offset all numbers
+					*currentPayload = tailPayload
+					log.Debug().Msgf("We cut the echo message %s out of the response to keep numbering", headPayload, *currentPayload)
+					return mergeCheck(currentPayload, srcUip, mergeCallback, currentInboundPayloads, lastPayload)
+				} else {
+					if mergeCallback != nil {
+						mergeCallback(i)
+					}
+					// In this case we need to put the tail into our "buffer"
+					currentInboundPayloads[srcUip] = tailPayload
+					// and use the beginning as current payload
+					*currentPayload = headPayload
+					return true, false
+				}
+			}
+		}
+	}
+	return false, true
+}
+
 func filterXOnXOff(payload []byte) []byte {
 	n := 0
-	for _, b := range payload {
+	for i, b := range payload {
 		switch b {
 		case 0x11: // Filter XON
+			fallthrough
 		case 0x13: // Filter XOFF
+			log.Trace().Msgf("Filtering %x at %d for %+q", b, i, payload)
 		default:
 			payload[n] = b
 			n++
@@ -277,7 +305,10 @@ func (a *Analyzer) MapPackets(in chan gopacket.Packet, packetInformationCreator
 					a.mappedPacketChan <- packet
 				default:
 					packetInformation := packetInformationCreator(packet)
-					if payload, err := a.getCurrentPayload(packetInformation, packet.ApplicationLayer().Payload(), a.isResponse(packetInformation), false, true); err != nil {
+					mergeCallback := func(index int) {
+						log.Warn().Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d", index)
+					}
+					if payload, err := a.getCurrentPayload(packetInformation, packet.ApplicationLayer().Payload(), mergeCallback, a.currentPrefilterInboundPayloads, &a.lastMapPayload); err != nil {
 						log.Debug().Err(err).Stringer("packetInformation", packetInformation).Msg("Filtering message")
 						a.mappedPacketChan <- common.NewFilteredPackage(err, packet)
 					} else {
@@ -287,6 +318,7 @@ func (a *Analyzer) MapPackets(in chan gopacket.Packet, packetInformationCreator
 							log.Debug().Msgf("Replacing payload %q with %q", currentApplicationLayer.Payload(), payload)
 							packet = &manipulatedPackage{Packet: packet, newApplicationLayer: newPayload}
 						}
+						a.lastMapPayload = payload
 						a.mappedPacketChan <- packet
 					}
 				}
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go b/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
index e914a27ad..bd20adabb 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
@@ -42,8 +42,12 @@ func (p PacketInformation) String() string {
 // ErrUnterminatedPackage is used when a transmission is incomplete (usually when package is split)
 var ErrUnterminatedPackage = errors.New("ErrUnterminatedPackage")
 
+// ErrEmptyPackage is used when there is no payload
 var ErrEmptyPackage = errors.New("ErrEmptyPackage")
 
+// ErrEcho is used when the package is a echo from the previous
+var ErrEcho = errors.New("ErrEcho")
+
 type FilteredPackage interface {
 	gopacket.Packet
 	IsFilteredPackage() bool