You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/13 16:15:21 UTC
[plc4x] branch develop updated: feat(plc4go/bacnet): first comm went through new stack
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 2265a6ad5b feat(plc4go/bacnet): first comm went through new stack
2265a6ad5b is described below
commit 2265a6ad5b4b947c2c66d027cd7028796d8c197d
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jan 13 17:10:32 2023 +0100
feat(plc4go/bacnet): first comm went through new stack
---
plc4go/internal/bacnetip/NetworkService.go | 4 +-
plc4go/internal/bacnetip/PDU.go | 46 +++++++++++++-
.../internal/bacnetip/UDPCommunicationsModule.go | 73 +++++++++++++++-------
3 files changed, 97 insertions(+), 26 deletions(-)
diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/NetworkService.go
index 4992235732..e60d1c1105 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/NetworkService.go
@@ -459,6 +459,7 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
var destinationNetworkAddress *uint16
var destinationLength *uint8
var destinationAddress []uint8
+ var destinationHopCount *uint8
if destinationSpecified {
destinationSpecified = true
destinationNetworkAddress = destination.AddrNet
@@ -473,9 +474,10 @@ func buildNPDU(hopCount uint8, source *Address, destination *Address, expectingR
// If we define the len 0 we must not send the array
destinationAddress = nil
}
+ destinationHopCount = &hopCount
}
control := readWriteModel.NewNPDUControl(false, destinationSpecified, sourceSpecified, expectingReply, networkPriority)
- return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, &hopCount, nil, apdu, 0), nil
+ return readWriteModel.NewNPDU(1, control, destinationNetworkAddress, destinationLength, destinationAddress, sourceNetworkAddress, sourceLength, sourceAddress, destinationHopCount, nil, apdu, 0), nil
}
func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PDU) error {
diff --git a/plc4go/internal/bacnetip/PDU.go b/plc4go/internal/bacnetip/PDU.go
index 35fe6b18b8..64bf599254 100644
--- a/plc4go/internal/bacnetip/PDU.go
+++ b/plc4go/internal/bacnetip/PDU.go
@@ -29,6 +29,7 @@ import (
"net"
"reflect"
"regexp"
+ "strings"
)
type AddressType int
@@ -163,7 +164,10 @@ func (a *Address) decodeAddress(addr interface{}) error {
case net.Addr:
// TODO: hacked in udp support
udpAddr := addr.(*net.UDPAddr)
- a.AddrAddress = udpAddr.IP
+ a.AddrAddress = udpAddr.IP.To4()
+ if a.AddrAddress == nil {
+ a.AddrAddress = udpAddr.IP.To16()
+ }
length := uint32(len(a.AddrAddress))
a.AddrLen = &length
port := uint16(udpAddr.Port)
@@ -300,7 +304,45 @@ func (a *Address) Equals(other interface{}) bool {
}
func (a *Address) String() string {
- return fmt.Sprintf("%#v", a)
+ if a == nil {
+ return "<nil>"
+ }
+ var sb strings.Builder
+ sb.WriteString(a.AddrType.String())
+ if a.AddrNet != nil {
+ _, _ = fmt.Fprintf(&sb, ", net: %d", *a.AddrNet)
+ }
+ if len(a.AddrAddress) > 0 {
+ _, _ = fmt.Fprintf(&sb, ", address: %d", a.AddrAddress)
+ }
+ if a.AddrLen != nil {
+ _, _ = fmt.Fprintf(&sb, " with len %d", *a.AddrLen)
+ }
+ if a.AddrRoute != nil {
+ _, _ = fmt.Fprintf(&sb, ", route: %s", a.AddrRoute)
+ }
+ if a.AddrIP != nil {
+ _, _ = fmt.Fprintf(&sb, ", ip: %d", *a.AddrIP)
+ }
+ if a.AddrMask != nil {
+ _, _ = fmt.Fprintf(&sb, ", mask: %d", *a.AddrMask)
+ }
+ if a.AddrHost != nil {
+ _, _ = fmt.Fprintf(&sb, ", host: %d", *a.AddrHost)
+ }
+ if a.AddrSubnet != nil {
+ _, _ = fmt.Fprintf(&sb, ", subnet: %d", *a.AddrSubnet)
+ }
+ if a.AddrPort != nil {
+ _, _ = fmt.Fprintf(&sb, ", port: %d", *a.AddrPort)
+ }
+ if a.AddrTuple != nil {
+ _, _ = fmt.Fprintf(&sb, ", tuple: %s", a.AddrTuple)
+ }
+ if a.AddrBroadcastTuple != nil {
+ _, _ = fmt.Fprintf(&sb, ", broadcast tuple: %s", a.AddrBroadcastTuple)
+ }
+ return sb.String()
}
func portToUint16(port []byte) uint16 {
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index f64c27c296..b39150ef26 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -20,10 +20,9 @@
package bacnetip
import (
- "bufio"
"fmt"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi/transports/udp"
+ "github.com/libp2p/go-reuseport"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"net"
@@ -109,7 +108,7 @@ type UDPDirector struct {
timeout uint32
reuse bool
address AddressTuple[string, uint16]
- ti *udp.TransportInstance
+ udpConn *net.UDPConn
actorClass func(*UDPDirector, string) *UDPActor
request chan _PDU
@@ -149,9 +148,16 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
if err != nil {
return nil, errors.Wrap(err, "error resolving udp address")
}
- d.ti = udp.NewTransportInstance(resolvedAddress, nil, d.timeout, d.reuse, nil)
- if err := d.ti.Connect(); err != nil {
- return nil, errors.Wrap(err, "error connecting transport instance")
+ if d.reuse {
+ if packetConn, err := reuseport.ListenPacket("udp", resolvedAddress.String()); err != nil {
+ return nil, errors.Wrap(err, "error connecting to local address")
+ } else {
+ d.udpConn = packetConn.(*net.UDPConn)
+ }
+ } else {
+ if d.udpConn, err = net.ListenUDP("udp", resolvedAddress); err != nil {
+ return nil, errors.Wrap(err, "error connecting to local address")
+ }
}
d.running = true
@@ -164,7 +170,28 @@ func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *b
// create the request queue
d.request = make(chan _PDU)
go func() {
- // TODO: get requests and send them...
+ for {
+ pdu := <-d.request
+ serialize, err := pdu.GetMessage().Serialize()
+ if err != nil {
+ log.Error().Err(err).Msg("Error building message")
+ continue
+ }
+ // TODO: wonky address object
+ destination := pdu.GetPDUDestination()
+ addr := net.IPv4(destination.AddrAddress[0], destination.AddrAddress[1], destination.AddrAddress[2], destination.AddrAddress[3])
+ udpAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", addr, *destination.AddrPort))
+ if err != nil {
+ log.Error().Err(err).Msg("Error resolving address")
+ continue
+ }
+ writtenBytes, err := d.udpConn.WriteToUDP(serialize, udpAddr)
+ if err != nil {
+ log.Error().Err(err).Msg("Error writing bytes")
+ continue
+ }
+ log.Debug().Msgf("%d written bytes", writtenBytes)
+ }
}()
// start with an empty peer pool
@@ -213,31 +240,31 @@ func (d *UDPDirector) ActorError(err error) {
func (d *UDPDirector) Close() error {
d.running = false
- return d.ti.Close()
+ return d.udpConn.Close()
}
func (d *UDPDirector) handleRead() {
log.Debug().Msgf("handleRead(%v)", d.address)
- if err := d.ti.FillBuffer(func(pos uint, _ byte, _ *bufio.Reader) bool {
- if pos >= 4 {
- return false
- }
- return true
- }); err != nil {
- // pass along to a handler
- d.handleError(errors.Wrap(err, "error filling buffer"))
+ firstFourBytes := make([]byte, 4)
+ if read, err := d.udpConn.Read(firstFourBytes); err != nil {
+ log.Error().Err(err).Msg("error reading")
return
- }
- peekedBytes, err := d.ti.PeekReadableBytes(4)
- if err != nil {
- // pass along to a handler
- d.handleError(errors.Wrap(err, "error peeking 4 bytes"))
+ } else if read != 4 {
+ log.Error().Msgf("Not enough data %d", read)
return
}
- length := uint32(peekedBytes[2])<<8 | uint32(peekedBytes[3])
- readBytes, err := d.ti.Read(length)
+ length := uint32(firstFourBytes[2])<<8 | uint32(firstFourBytes[3])
+ remainingMessage := make([]byte, length-4)
+ if read, err := d.udpConn.Read(remainingMessage); err != nil {
+ log.Error().Err(err).Msg("error reading")
+ return
+ } else if read != int(length-4) {
+ log.Error().Msgf("Not enough data: actual: %d, wanted: %d", read, length-4)
+ return
+ }
+ readBytes := append(firstFourBytes, remainingMessage...)
bvlc, err := model.BVLCParse(readBytes)
if err != nil {