You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/13 16:15:21 UTC

[plc4x] branch develop updated: feat(plc4go/bacnet): first comm went through new stack

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2265a6ad5b feat(plc4go/bacnet): first comm went through new stack
2265a6ad5b is described below

commit 2265a6ad5b4b947c2c66d027cd7028796d8c197d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jan 13 17:10:32 2023 +0100

    feat(plc4go/bacnet): first comm went through new stack
---
 plc4go/internal/bacnetip/NetworkService.go         |  4 +-
 plc4go/internal/bacnetip/PDU.go                    | 46 +++++++++++++-
 .../internal/bacnetip/UDPCommunicationsModule.go   | 73 +++++++++++++++-------
 3 files changed, 97 insertions(+), 26 deletions(-)

diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/NetworkService.go
index 4992235732..e60d1c1105 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/NetworkService.go
@@ -459,6 +459,7 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
 	var destinationNetworkAddress *uint16
 	var destinationLength *uint8
 	var destinationAddress []uint8
+	var destinationHopCount *uint8
 	if destinationSpecified {
 		destinationSpecified = true
 		destinationNetworkAddress = destination.AddrNet
@@ -473,9 +474,10 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
 			// If we define the len 0 we must not send the array
 			destinationAddress = nil
 		}
+		destinationHopCount = &hopCount
 	}
 	control := readWriteModel.NewNPDUControl(false, destinationSpecified, sourceSpecified, expectingReply, networkPriority)
-	return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, &hopCount, nil, apdu, 0), nil
+	return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, destinationHopCount, nil, apdu, 0), nil
 }
 
 func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PDU) error {
diff --git a/plc4go/internal/bacnetip/PDU.go b/plc4go/internal/bacnetip/PDU.go
index 35fe6b18b8..64bf599254 100644
--- a/plc4go/internal/bacnetip/PDU.go
+++ b/plc4go/internal/bacnetip/PDU.go
@@ -29,6 +29,7 @@ import (
 	"net"
 	"reflect"
 	"regexp"
+	"strings"
 )
 
 type AddressType int
@@ -163,7 +164,10 @@ func (a *Address) decodeAddress(addr interface{}) error {
 		case net.Addr:
 			// TODO: hacked in udp support
 			udpAddr := addr.(*net.UDPAddr)
-			a.AddrAddress = udpAddr.IP
+			a.AddrAddress = udpAddr.IP.To4()
+			if a.AddrAddress == nil {
+				a.AddrAddress = udpAddr.IP.To16()
+			}
 			length := uint32(len(a.AddrAddress))
 			a.AddrLen = &length
 			port := uint16(udpAddr.Port)
@@ -300,7 +304,45 @@ func (a *Address) Equals(other interface{}) bool {
 }
 
 func (a *Address) String() string {
-	return fmt.Sprintf("%#v", a)
+	if a == nil {
+		return "<nil>"
+	}
+	var sb strings.Builder
+	sb.WriteString(a.AddrType.String())
+	if a.AddrNet != nil {
+		_, _ = fmt.Fprintf(&sb, ", net: %d", *a.AddrNet)
+	}
+	if len(a.AddrAddress) > 0 {
+		_, _ = fmt.Fprintf(&sb, ", address: %d", a.AddrAddress)
+	}
+	if a.AddrLen != nil {
+		_, _ = fmt.Fprintf(&sb, " with len %d", *a.AddrLen)
+	}
+	if a.AddrRoute != nil {
+		_, _ = fmt.Fprintf(&sb, ", route: %s", a.AddrRoute)
+	}
+	if a.AddrIP != nil {
+		_, _ = fmt.Fprintf(&sb, ", ip: %d", *a.AddrIP)
+	}
+	if a.AddrMask != nil {
+		_, _ = fmt.Fprintf(&sb, ", mask: %d", *a.AddrMask)
+	}
+	if a.AddrHost != nil {
+		_, _ = fmt.Fprintf(&sb, ", host: %d", *a.AddrHost)
+	}
+	if a.AddrSubnet != nil {
+		_, _ = fmt.Fprintf(&sb, ", subnet: %d", *a.AddrSubnet)
+	}
+	if a.AddrPort != nil {
+		_, _ = fmt.Fprintf(&sb, ", port: %d", *a.AddrPort)
+	}
+	if a.AddrTuple != nil {
+		_, _ = fmt.Fprintf(&sb, ", tuple: %s", a.AddrTuple)
+	}
+	if a.AddrBroadcastTuple != nil {
+		_, _ = fmt.Fprintf(&sb, ", broadcast tuple: %s", a.AddrBroadcastTuple)
+	}
+	return sb.String()
 }
 
 func portToUint16(port []byte) uint16 {
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index f64c27c296..b39150ef26 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -20,10 +20,9 @@
 package bacnetip
 
 import (
-	"bufio"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-	"github.com/apache/plc4x/plc4go/spi/transports/udp"
+	"github.com/libp2p/go-reuseport"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"net"
@@ -109,7 +108,7 @@ type UDPDirector struct {
 	timeout uint32
 	reuse   bool
 	address AddressTuple[string, uint16]
-	ti      *udp.TransportInstance
+	udpConn *net.UDPConn
 
 	actorClass func(*UDPDirector, string) *UDPActor
 	request    chan _PDU
@@ -149,9 +148,16 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
 	if err != nil {
 		return nil, errors.Wrap(err, "error resolving udp address")
 	}
-	d.ti = udp.NewTransportInstance(resolvedAddress, nil, d.timeout, d.reuse, nil)
-	if err := d.ti.Connect(); err != nil {
-		return nil, errors.Wrap(err, "error connecting transport instance")
+	if d.reuse {
+		if packetConn, err := reuseport.ListenPacket("udp", resolvedAddress.String()); err != nil {
+			return nil, errors.Wrap(err, "error connecting to local address")
+		} else {
+			d.udpConn = packetConn.(*net.UDPConn)
+		}
+	} else {
+		if d.udpConn, err = net.ListenUDP("udp", resolvedAddress); err != nil {
+			return nil, errors.Wrap(err, "error connecting to local address")
+		}
 	}
 
 	d.running = true
@@ -164,7 +170,28 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
 	// create the request queue
 	d.request = make(chan _PDU)
 	go func() {
-		// TODO: get requests and send them...
+		for {
+			pdu := <-d.request
+			serialize, err := pdu.GetMessage().Serialize()
+			if err != nil {
+				log.Error().Err(err).Msg("Error building message")
+				continue
+			}
+			// TODO: wonky address object
+			destination := pdu.GetPDUDestination()
+			addr := net.IPv4(destination.AddrAddress[0], destination.AddrAddress[1], destination.AddrAddress[2], destination.AddrAddress[3])
+			udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, *destination.AddrPort))
+			if err != nil {
+				log.Error().Err(err).Msg("Error resolving address")
+				continue
+			}
+			writtenBytes, err := d.udpConn.WriteToUDP(serialize, udpAddr)
+			if err != nil {
+				log.Error().Err(err).Msg("Error writing bytes")
+				continue
+			}
+			log.Debug().Msgf("%d written bytes", writtenBytes)
+		}
 	}()
 
 	// start with an empty peer pool
@@ -213,31 +240,31 @@ func (d *UDPDirector) ActorError(err error) {
 
 func (d *UDPDirector) Close() error {
 	d.running = false
-	return d.ti.Close()
+	return d.udpConn.Close()
 }
 
 func (d *UDPDirector) handleRead() {
 	log.Debug().Msgf("handleRead(%v)", d.address)
 
-	if err := d.ti.FillBuffer(func(pos uint, _ byte, _ *bufio.Reader) bool {
-		if pos >= 4 {
-			return false
-		}
-		return true
-	}); err != nil {
-		// pass along to a handler
-		d.handleError(errors.Wrap(err, "error filling buffer"))
+	firstFourBytes := make([]byte, 4)
+	if read, err := d.udpConn.Read(firstFourBytes); err != nil {
+		log.Error().Err(err).Msg("error reading")
 		return
-	}
-	peekedBytes, err := d.ti.PeekReadableBytes(4)
-	if err != nil {
-		// pass along to a handler
-		d.handleError(errors.Wrap(err, "error peeking 4 bytes"))
+	} else if read != 4 {
+		log.Error().Msgf("Not enough data %d", read)
 		return
 	}
 
-	length := uint32(peekedBytes[2])<<8 | uint32(peekedBytes[3])
-	readBytes, err := d.ti.Read(length)
+	length := uint32(firstFourBytes[2])<<8 | uint32(firstFourBytes[3])
+	remainingMessage := make([]byte, length-4)
+	if read, err := d.udpConn.Read(remainingMessage); err != nil {
+		log.Error().Err(err).Msg("error reading")
+		return
+	} else if read != int(length-4) {
+		log.Error().Msgf("Not enough data: actual: %d, wanted: %d", read, length-4)
+		return
+	}
+	readBytes := append(firstFourBytes, remainingMessage...)
 
 	bvlc, err := model.BVLCParse(readBytes)
 	if err != nil {