You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/04 12:37:35 UTC
[plc4x] 02/03: feat(plc4xpcapanalyzer/cbus): echo in merge requests should now be discarded so numbering won't get messed up
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit c8893e012624bc05192445d6d67e7df1e218e806
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Aug 4 13:29:34 2022 +0200
feat(plc4xpcapanalyzer/cbus): echo in merge requests should now be discarded so numbering won't get messed up
---
.../internal/analyzer/analyzer.go | 2 +
.../internal/cbusanalyzer/analyzer.go | 126 +++++++++++++--------
.../plc4xpcapanalyzer/internal/common/common.go | 4 +
3 files changed, 85 insertions(+), 47 deletions(-)
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
index c53c06191..17b9f147b 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/analyzer/analyzer.go
@@ -126,6 +126,8 @@ func Analyze(pcapFile, protocolType string) {
log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is unterminated", realPacketNumber)
case common.ErrEmptyPackage:
log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is empty", realPacketNumber)
+ case common.ErrEcho:
+ log.Info().Stringer("packetInformation", packetInformation).Msgf("No.[%d] is echo", realPacketNumber)
default:
parseFails++
// TODO: write report to xml or something
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go b/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
index f6573b92f..5753427ea 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/cbusanalyzer/analyzer.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/plc4x/plc4go/tools/plc4xpcapanalyzer/internal/common"
"github.com/google/gopacket"
"github.com/pkg/errors"
+ "github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"net"
"reflect"
@@ -41,6 +42,9 @@ type Analyzer struct {
currentInboundPayloads map[string][]byte
currentPrefilterInboundPayloads map[string][]byte
mappedPacketChan chan gopacket.Packet
+
+ lastParsePayload []byte
+ lastMapPayload []byte
}
func (a *Analyzer) Init() {
@@ -76,14 +80,26 @@ func (a *Analyzer) PackageParse(packetInformation common.PacketInformation, payl
true,
)
}
- currentPayload, err := a.getCurrentPayload(packetInformation, payload, isResponse, true, false)
+ mergeCallback := func(index int) {
+ log.Warn().Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d", index)
+ }
+ currentPayload, err := a.getCurrentPayload(packetInformation, payload, mergeCallback, a.currentInboundPayloads, &a.lastParsePayload)
if err != nil {
return nil, err
}
- // TODO: apparently we only do crc on receive with our tests so we need to implement that
+ if reflect.DeepEqual(currentPayload, a.lastParsePayload) {
+ return nil, common.ErrEcho
+ }
+ a.lastParsePayload = currentPayload
parse, err := model.CBusMessageParse(utils.NewReadBufferByteBased(currentPayload), isResponse, a.requestContext, cBusOptions)
if err != nil {
- return nil, errors.Wrap(err, "Error parsing CBusCommand")
+ if secondParse, err := model.CBusMessageParse(utils.NewReadBufferByteBased(currentPayload), isResponse, model.NewRequestContext(false, false, false), model.NewCBusOptions(false, false, false, false, false, false, false, false, false)); err != nil {
+ log.Debug().Err(err).Msg("Second parse failed too")
+ return nil, errors.Wrap(err, "Error parsing CBusCommand")
+ } else {
+ log.Warn().Stringer("packetInformation", packetInformation).Msgf("package got overridden by second parse... probably a MMI\n%s", secondParse)
+ parse = secondParse
+ }
}
a.requestContext = cbus.CreateRequestContextWithInfoCallback(parse, func(infoString string) {
log.Debug().Msgf("No.[%d] %s", packetInformation.PacketNumber, infoString)
@@ -98,64 +114,33 @@ func (a *Analyzer) isResponse(packetInformation common.PacketInformation) bool {
return isResponse
}
-func (a *Analyzer) getCurrentPayload(packetInformation common.PacketInformation, payload []byte, isResponse bool, warnAboutSplit bool, usePrefilterPayloads bool) ([]byte, error) {
+func (a *Analyzer) getCurrentPayload(packetInformation common.PacketInformation, payload []byte, mergeCallback func(int), currentInboundPayloads map[string][]byte, lastPayload *[]byte) ([]byte, error) {
srcUip := packetInformation.SrcIp.String()
payload = filterXOnXOff(payload)
if len(payload) == 0 {
return nil, common.ErrEmptyPackage
}
// Check if we have a termination in the middle
- currentInboundPayloads := a.currentInboundPayloads
- if usePrefilterPayloads {
- currentInboundPayloads = a.currentPrefilterInboundPayloads
- }
currentPayload := currentInboundPayloads[srcUip]
- currentPayload = append(currentPayload, payload...)
- shouldClearInboundPayload := true
- isMergedMessage := false
- // Check if we have a merged message
-mergeCheck:
- for i, b := range currentPayload {
- if i == 0 {
- // TODO: we ignore the first byte as this is typical for reset etc... so maybe this is good or bad we will see
- continue
- }
- switch b {
- case 0x0D:
- if i+1 < len(currentPayload) && currentPayload[i+1] == 0x0A {
- // If we know the next is a newline we jump to that index...
- i++
- }
- // ... other than that the logic is the same
- fallthrough
- case 0x0A:
- // We have a merged message if we are not at the end
- if i < len(currentPayload)-1 {
- event := log.Warn()
- if !warnAboutSplit {
- event = log.Debug()
- }
- event.Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d (usePrefilterPayloads=%t)", i, usePrefilterPayloads)
- // In this case we need to put the tail into our "buffer"
- currentInboundPayloads[srcUip] = currentPayload[i+1:]
- // and use the beginning as current payload
- currentPayload = currentPayload[:i+1]
- shouldClearInboundPayload = false
- isMergedMessage = true
- break mergeCheck
- }
- }
+ if currentPayload != nil {
+ log.Debug().Func(func(e *zerolog.Event) {
+ e.Msgf("Prepending current payload %+q to actual payload %+q: %+q", currentPayload, payload, append(currentPayload, payload...))
+ })
+ currentPayload = append(currentPayload, payload...)
+ } else {
+ currentPayload = payload
}
+ isMergedMessage, shouldClearInboundPayload := mergeCheck(¤tPayload, srcUip, mergeCallback, currentInboundPayloads, lastPayload)
if !isMergedMessage {
// When we have a merge message we already set the current payload to the tail
currentInboundPayloads[srcUip] = currentPayload
} else {
log.Debug().Stringer("packetInformation", packetInformation).Msgf("Remainder %+q", currentInboundPayloads[srcUip])
}
- if lastElement := currentPayload[len(currentPayload)-1]; (!isResponse /*a request must end with cr*/ && lastElement != 0x0D /*cr*/) || (isResponse /*a response must end with lf*/ && lastElement != 0x0A /*lf*/) {
+ if lastElement := currentPayload[len(currentPayload)-1]; (lastElement != '\r') && (lastElement != '\n') {
return nil, common.ErrUnterminatedPackage
} else {
- log.Debug().Msgf("Last element 0x%x", lastElement)
+ log.Debug().Stringer("packetInformation", packetInformation).Msgf("Last element 0x%x", lastElement)
if shouldClearInboundPayload {
if currentSavedPayload := currentInboundPayloads[srcUip]; currentSavedPayload != nil {
// We remove our current payload from the beginning of the cache
@@ -172,12 +157,55 @@ mergeCheck:
return currentPayload, nil
}
+func mergeCheck(currentPayload *[]byte, srcUip string, mergeCallback func(int), currentInboundPayloads map[string][]byte, lastPayload *[]byte) (isMergedMessage, shouldClearInboundPayload bool) {
+ // Check if we have a merged message
+ for i, b := range *currentPayload {
+ if i == 0 {
+ // we ignore the first byte as this is typical for reset etc... so maybe this is good or bad we will see
+ continue
+ }
+ switch b {
+ case 0x0D:
+ if i+1 < len(*currentPayload) && (*currentPayload)[i+1] == 0x0A {
+ // If we know the next is a newline we jump to that index...
+ i++
+ }
+ // ... other than that the logic is the same
+ fallthrough
+ case 0x0A:
+ // We have a merged message if we are not at the end
+ if i < len(*currentPayload)-1 {
+ headPayload := (*currentPayload)[:i+1]
+ tailPayload := (*currentPayload)[i+1:]
+ if reflect.DeepEqual(headPayload, *lastPayload) {
+ // This means that we have a merge where the last payload is an echo. In that case we discard that here to not offset all numbers
+ *currentPayload = tailPayload
+ log.Debug().Msgf("We cut the echo message %s out of the response to keep numbering", headPayload, *currentPayload)
+ return mergeCheck(currentPayload, srcUip, mergeCallback, currentInboundPayloads, lastPayload)
+ } else {
+ if mergeCallback != nil {
+ mergeCallback(i)
+ }
+ // In this case we need to put the tail into our "buffer"
+ currentInboundPayloads[srcUip] = tailPayload
+ // and use the beginning as current payload
+ *currentPayload = headPayload
+ return true, false
+ }
+ }
+ }
+ }
+ return false, true
+}
+
func filterXOnXOff(payload []byte) []byte {
n := 0
- for _, b := range payload {
+ for i, b := range payload {
switch b {
case 0x11: // Filter XON
+ fallthrough
case 0x13: // Filter XOFF
+ log.Trace().Msgf("Filtering %x at %d for %+q", b, i, payload)
default:
payload[n] = b
n++
@@ -277,7 +305,10 @@ func (a *Analyzer) MapPackets(in chan gopacket.Packet, packetInformationCreator
a.mappedPacketChan <- packet
default:
packetInformation := packetInformationCreator(packet)
- if payload, err := a.getCurrentPayload(packetInformation, packet.ApplicationLayer().Payload(), a.isResponse(packetInformation), false, true); err != nil {
+ mergeCallback := func(index int) {
+ log.Warn().Stringer("packetInformation", packetInformation).Msgf("we have a split at index %d", index)
+ }
+ if payload, err := a.getCurrentPayload(packetInformation, packet.ApplicationLayer().Payload(), mergeCallback, a.currentPrefilterInboundPayloads, &a.lastMapPayload); err != nil {
log.Debug().Err(err).Stringer("packetInformation", packetInformation).Msg("Filtering message")
a.mappedPacketChan <- common.NewFilteredPackage(err, packet)
} else {
@@ -287,6 +318,7 @@ func (a *Analyzer) MapPackets(in chan gopacket.Packet, packetInformationCreator
log.Debug().Msgf("Replacing payload %q with %q", currentApplicationLayer.Payload(), payload)
packet = &manipulatedPackage{Packet: packet, newApplicationLayer: newPayload}
}
+ a.lastMapPayload = payload
a.mappedPacketChan <- packet
}
}
diff --git a/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go b/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
index e914a27ad..bd20adabb 100644
--- a/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
+++ b/plc4go/tools/plc4xpcapanalyzer/internal/common/common.go
@@ -42,8 +42,12 @@ func (p PacketInformation) String() string {
// ErrUnterminatedPackage is used when a transmission is incomplete (usually when package is split)
var ErrUnterminatedPackage = errors.New("ErrUnterminatedPackage")
+// ErrEmptyPackage is used when there is no payload
var ErrEmptyPackage = errors.New("ErrEmptyPackage")
+// ErrEcho is used when the package is a echo from the previous
+var ErrEcho = errors.New("ErrEcho")
+
type FilteredPackage interface {
gopacket.Packet
IsFilteredPackage() bool