You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/16 14:41:57 UTC
[plc4x] branch develop updated: feat(plc4go/bacnet): basic comm working
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 574dd3f633 feat(plc4go/bacnet): basic comm working
574dd3f633 is described below
commit 574dd3f633a8c256f03bb35554eb5b517275540a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jan 16 15:41:49 2023 +0100
feat(plc4go/bacnet): basic comm working
---
plc4go/internal/bacnetip/ApplicationLayer.go | 5 +-
plc4go/internal/bacnetip/ApplicationModule.go | 4 +-
.../bacnetip/BACnetVirtualLinkLayerService.go | 46 ++-
plc4go/internal/bacnetip/IOCBModule.go | 16 +-
plc4go/internal/bacnetip/MessageCodec.go | 12 +-
plc4go/internal/bacnetip/NetworkService.go | 322 ++++++++++++++++++++-
.../internal/bacnetip/UDPCommunicationsModule.go | 40 +--
7 files changed, 381 insertions(+), 64 deletions(-)
diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index 1dcaf01551..2ddccb2843 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -1702,8 +1702,9 @@ func (s *StateMachineAccessPoint) Confirmation(apdu _PDU) error { // TODO: note
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
// find the client transaction this is acking
var tr *ClientSSM
- for _, tr := range s.clientTransactions {
- if apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == tr.invokeId && pduSource.Equals(tr.pduAddress) {
+ for _, _tr := range s.clientTransactions {
+ if _apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == _tr.invokeId && pduSource.Equals(_tr.pduAddress) {
+ tr = _tr
break
}
}
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 5d0cd114b9..fc8822299d 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -365,7 +365,7 @@ func (a *ApplicationIOController) _AppComplete(address *Address, apdu _PDU) erro
}
// this request is complete
- switch apdu.(type) {
+ switch apdu.GetMessage().(type) {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
if err := queue.CompleteIO(queue.activeIOCB, apdu); err != nil {
return err
@@ -420,7 +420,7 @@ func (a *ApplicationIOController) Confirmation(apdu _PDU) error {
log.Debug().Msgf("Confirmation\n%s", apdu)
// this is an ack, error, reject or abort
- return a._AppComplete(apdu.GetPDUDestination(), apdu)
+ return a._AppComplete(apdu.GetPDUSource(), apdu)
}
type BIPSimpleApplication struct {
diff --git a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
index 32c94ad301..693efbf314 100644
--- a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
+++ b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
@@ -68,7 +68,7 @@ func (m *_MultiplexServer) Indication(pdu _PDU) error {
}
type UDPMultiplexer struct {
- address Address
+ address *Address
addrTuple *AddressTuple[string, uint16]
addrBroadcastTuple *AddressTuple[string, uint16]
direct *_MultiplexClient
@@ -87,21 +87,21 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
specialBroadcast := false
if address == nil {
address, _ := NewAddress()
- u.address = *address
+ u.address = address
u.addrTuple = &AddressTuple[string, uint16]{"", 47808}
u.addrBroadcastTuple = &AddressTuple[string, uint16]{"255.255.255.255", 47808}
} else {
// allow the address to be cast
if caddress, ok := address.(*Address); ok {
- u.address = *caddress
- } else if caddress, ok := address.(Address); ok {
u.address = caddress
+ } else if caddress, ok := address.(Address); ok {
+ u.address = &caddress
} else {
newAddress, err := NewAddress(address)
if err != nil {
return nil, errors.Wrap(err, "error parsing address")
}
- u.address = *newAddress
+ u.address = newAddress
}
// promote the normal and broadcast tuples
@@ -210,7 +210,33 @@ func (m *UDPMultiplexer) Confirmation(client *_MultiplexClient, pdu _PDU) error
return nil
}
- // TODO: it is getting to messy, we need to solve the source destination topic
+ // TODO: upstream this is a tuple but we don't have that here so we can work with what we have
+ src := pduSource
+ var dest *Address
+
+ // match the destination in case the stack needs it
+ if client == m.direct {
+ log.Debug().Msg("direct to us")
+ dest = m.address
+ } else if client == m.broadcast {
+ log.Debug().Msg("broadcast to us")
+ dest = NewLocalBroadcast(nil)
+ } else {
+ return errors.New("Confirmation missmatch")
+ }
+ log.Debug().Msgf("dest: %s", dest)
+
+ // must have at least one octet
+ if pdu.GetMessage() == nil {
+ log.Debug().Msg("no data")
+ return nil
+ }
+
+ // TODO: we only support 0x81 at the moment
+ if m.annexJ != nil {
+ return m.annexJ.Response(NewPDU(pdu.GetMessage(), WithPDUSource(src), WithPDUDestination(dest)))
+ }
+
return nil
}
@@ -242,7 +268,7 @@ func (b *AnnexJCodec) Indication(pdu _PDU) error {
func (b *AnnexJCodec) Confirmation(pdu _PDU) error {
// Note: our BVLC are all annexJ at the moment
- return b.Request(pdu)
+ return b.Response(pdu)
}
type _BIPSAP interface {
@@ -347,14 +373,14 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
return b.SapRequest(pdu)
case readWriteModel.BVLCOriginalUnicastNPDUExactly:
// build a vanilla PDU
- xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
+ xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
log.Debug().Msgf("xpdu: %s", xpdu)
// send it upstream
return b.Response(xpdu)
case readWriteModel.BVLCOriginalBroadcastNPDUExactly:
// build a PDU with a local broadcast address
- xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
+ xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
log.Debug().Msgf("xpdu: %s", xpdu)
// send it upstream
@@ -367,7 +393,7 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
if err != nil {
return errors.Wrap(err, "error building a ip")
}
- xpdu := NewPDU(msg, WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
+ xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
log.Debug().Msgf("xpdu: %s", xpdu)
// send it upstream
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
index a886eb212b..bf1c39da5c 100644
--- a/plc4go/internal/bacnetip/IOCBModule.go
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -26,7 +26,6 @@ import (
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
- "net"
"sync"
"time"
)
@@ -89,7 +88,7 @@ type _IOCB interface {
Trigger()
setIOError(err error)
getRequest() _PDU
- getDestination() net.Addr
+ getDestination() *Address
getPriority() int
clearQueue()
Abort(err error) error
@@ -101,7 +100,7 @@ var _identLock sync.Mutex
type IOCB struct {
ioID int
request _PDU
- destination net.Addr
+ destination *Address
ioState IOCBState
ioResponse _PDU
ioError error
@@ -115,7 +114,7 @@ type IOCB struct {
priority int
}
-func NewIOCB(request _PDU, destination net.Addr) (*IOCB, error) {
+func NewIOCB(request _PDU, destination *Address) (*IOCB, error) {
// lock the identity sequence number
_identLock.Lock()
@@ -276,7 +275,7 @@ func (i *IOCB) getRequest() _PDU {
return i.request
}
-func (i *IOCB) getDestination() net.Addr {
+func (i *IOCB) getDestination() *Address {
return i.destination
}
@@ -793,17 +792,16 @@ func (i *IOQController) _waitTrigger() error {
stateLog.Debug().Msgf("%s %s %s", time.Now(), i.name, "idle")
// look for more to do
- i._trigger()
- return nil
+ return i._trigger()
}
type SieveQueue struct {
*IOQController
requestFn func(apdu _PDU)
- address net.Addr
+ address *Address
}
-func NewSieveQueue(fn func(apdu _PDU), address net.Addr) (*SieveQueue, error) {
+func NewSieveQueue(fn func(apdu _PDU), address *Address) (*SieveQueue, error) {
s := &SieveQueue{}
var err error
s.IOQController, err = NewIOQController(address.String(), s)
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 5fe764b7dd..b1df57d4a3 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -102,11 +102,11 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
}
func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
- address, err2 := NewAddress(m.remoteAddress)
- if err2 != nil {
- panic(err2)
+ address, err := NewAddress(m.remoteAddress)
+ if err != nil {
+ return err
}
- iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), m.remoteAddress)
+ iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), address)
if err != nil {
return errors.Wrap(err, "error creating IOCB")
}
@@ -115,10 +115,10 @@ func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
iocb.Wait()
if iocb.ioError != nil {
// TODO: handle error
- println(iocb.ioError)
+ fmt.Printf("Err: %v\n", iocb.ioError)
} else if iocb.ioResponse != nil {
// TODO: response?
- println(iocb.ioResponse)
+ fmt.Printf("Response: %v\n", iocb.ioResponse)
} else {
// TODO: what now?
}
diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/NetworkService.go
index e60d1c1105..4a44fcbb93 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/NetworkService.go
@@ -20,6 +20,7 @@
package bacnetip
import (
+ "bytes"
"fmt"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
@@ -488,15 +489,39 @@ func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PD
return errors.New("no adapters")
}
- npdu := pdu.GetMessage().(readWriteModel.NPDU)
+ var (
+ processLocally bool
+ forwardMessage bool
+ )
- var snet *uint16
- // check for source routing
+ npdu := pdu.GetMessage().(readWriteModel.NPDU)
+ sourceAddress := &Address{AddrType: NULL_ADDRESS}
if npdu.GetControl().GetSourceSpecified() {
+ snet := npdu.GetSourceNetworkAddress()
+ sadr := npdu.GetSourceAddress()
+ var err error
+ sourceAddress, err = NewAddress(snet, sadr)
+ if err != nil {
+ return errors.Wrapf(err, "error parsing source address %x", sadr)
+ }
+ }
+ destinationAddress := &Address{AddrType: NULL_ADDRESS}
+ if npdu.GetControl().GetDestinationSpecified() {
+ dnet := npdu.GetDestinationNetworkAddress()
+ dadr := npdu.GetDestinationAddress()
+ var err error
+ destinationAddress, err = NewAddress(dnet, dadr)
+ if err != nil {
+ return errors.Wrapf(err, "error parsing destination address %x", dadr)
+ }
+ }
+ switch {
+ // check for source routing
+ case npdu.GetControl().GetSourceSpecified() && sourceAddress.AddrType != NULL_ADDRESS:
log.Debug().Msg("check source path")
// see if this is attempting to spoof a directly connected network
- snet = npdu.GetSourceNetworkAddress()
+ snet := npdu.GetSourceNetworkAddress()
if _, ok := n.adapters[snet]; !ok {
log.Warn().Msg("path error (1)")
return nil
@@ -504,24 +529,291 @@ func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PD
// pass this new path along to the cache
n.routerInfoCache.UpdateRouterStatus(adapter.adapterNet, pdu.GetPDUSource(), []*uint16{snet})
- }
-
- var (
- processLocally bool
- forwardMessage bool
- )
// check for destination routing
- if npdu.GetControl().GetDestinationSpecified() {
+ case !npdu.GetControl().GetDestinationSpecified() || destinationAddress.AddrType == NULL_ADDRESS:
log.Debug().Msg("no DADR")
processLocally = adapter == n.localAdapter || npdu.GetControl().GetMessageTypeFieldPresent()
forwardMessage = false
+ case destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS:
+ log.Debug().Msg("DADR is remote broadcast")
+
+ if *destinationAddress.AddrNet == *adapter.adapterNet {
+ log.Warn().Msg("path error (2)")
+ return nil
+ }
+
+ processLocally = *destinationAddress.AddrNet == *n.localAdapter.adapterNet
+ forwardMessage = true
+ case destinationAddress.AddrType == REMOTE_STATION_ADDRESS:
+ log.Debug().Msg("DADR is remote station")
+
+ if *destinationAddress.AddrNet == *adapter.adapterNet {
+ log.Warn().Msg("path error (3)")
+ return nil
+ }
+
+ processLocally = *destinationAddress.AddrNet == *n.localAdapter.adapterNet && bytes.Compare(destinationAddress.AddrAddress, n.localAdapter.adapterAddr.AddrAddress) == 0
+ forwardMessage = !processLocally
+ case destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS:
+ log.Debug().Msg("DADR is global broadcast")
+
+ processLocally = true
+ forwardMessage = true
+ default:
+ log.Warn().Msgf("invalid destination address type: %s", destinationAddress.AddrType)
+ return nil
+ }
+
+ log.Debug().Msgf("processLocally: %t", processLocally)
+ log.Debug().Msgf("forwardMessage: %t", forwardMessage)
+
+ // application or network layer message
+ if !npdu.GetControl().GetMessageTypeFieldPresent() {
+ log.Debug().Msg("application layer message")
+
+ // decode as a generic APDU
+ apdu := NewPDU(npdu.GetApdu())
+ log.Debug().Msgf("apdu:\n%s", apdu)
+
+ // see if it needs to look routed
+ if len(n.adapters) > 1 && adapter != n.localAdapter {
+ // combine the source address
+ if !npdu.GetControl().GetSourceSpecified() {
+ remoteStationAddress, err := NewAddress(adapter.adapterNet, pdu.GetPDUSource().AddrAddress)
+ if err != nil {
+ return errors.Wrap(err, "error creating remote address")
+ }
+ apdu.pduSource = remoteStationAddress
+ } else {
+ apdu.pduSource = sourceAddress
+ }
+ if settings.RouteAware {
+ apdu.pduSource.AddrRoute = pdu.GetPDUSource()
+ }
+
+ // map the destination
+ if !npdu.GetControl().GetDestinationSpecified() {
+ apdu.pduDestination = n.localAdapter.adapterAddr
+ } else if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+ apdu.pduDestination = NewGlobalBroadcast(nil)
+ } else if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+ apdu.pduDestination = NewLocalBroadcast(nil)
+ } else {
+ apdu.pduDestination = n.localAdapter.adapterAddr
+ }
+ } else {
+ // combine the source address
+ if npdu.GetControl().GetSourceSpecified() {
+ apdu.pduSource = sourceAddress
+ if settings.RouteAware {
+ log.Debug().Msg("adding route")
+ apdu.pduSource = pdu.GetPDUSource()
+ }
+ } else {
+ apdu.pduSource = pdu.GetPDUSource()
+ }
+
+ // pass along global broadcast
+ if npdu.GetControl().GetDestinationSpecified() && destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+ apdu.pduDestination = NewGlobalBroadcast(nil)
+ } else {
+ apdu.pduDestination = pdu.GetPDUDestination()
+ }
+ }
+
+ log.Debug().Msgf("apdu.pduSource: %s", apdu.pduSource)
+ log.Debug().Msgf("apdu.pduDestination: %s", apdu.pduDestination)
+
+ if err := n.Response(apdu); err != nil {
+ return errors.Wrap(err, "error passing response")
+ }
+ } else {
+ log.Debug().Msg("network layer message")
+
+ if processLocally {
+ log.Debug().Msg("processing NPDU locally")
+
+ // pass to the service element
+ // TODO: how to pass the adapter???
+ if err := n.SapRequest(pdu); err != nil {
+ return errors.Wrap(err, "error passing sap _request")
+ }
+ }
+ }
+
+ // might not need to forward this to other devices
+ if !forwardMessage {
+ log.Debug().Msg("no forwarding")
+ return nil
+ }
+
+ // make sure we're really a router
+ if len(n.adapters) == 1 {
+ log.Debug().Msg("not a router")
+ return nil
+ }
+
+ // make sure it hasn't looped
+ if npdu.GetHopCount() != nil && *npdu.GetHopCount() == 0 {
+ log.Debug().Msg("no more hops")
+ return nil
+ }
+
+ // build a new NPDU to send to other adapters
+ newpdu := NewPDUFromPDU(pdu)
+
+ // decrease the hop count
+ newNpduHopCount := *npdu.GetHopCount() - 1
+
+ // set the source address
+ var newSADR *Address
+ if !npdu.GetControl().GetSourceSpecified() {
+ var err error
+ newSADR, err = NewRemoteStation(adapter.adapterNet, sourceAddress.AddrAddress, nil)
+ if err != nil {
+ return errors.Wrap(err, "error creating remote station")
+ }
+ } else {
+ newSADR = destinationAddress
+ }
+
+ var newDADR *Address
+ // If this is a broadcast it goes everywhere
+ if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+ log.Debug().Msg("global broadcasting")
+
+ newDADR = NewLocalBroadcast(nil)
+ newSADRLength := uint8(len(newSADR.AddrAddress))
+ newDADRLength := uint8(len(newDADR.AddrAddress))
+ newpdu.pduUserData = readWriteModel.NewNPDU(
+ npdu.GetProtocolVersionNumber(),
+ readWriteModel.NewNPDUControl(
+ false,
+ true,
+ true,
+ false,
+ readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE,
+ ),
+ newDADR.AddrNet,
+ &newDADRLength,
+ newDADR.AddrAddress,
+ newSADR.AddrNet,
+ &newSADRLength,
+ newSADR.AddrAddress,
+ &newNpduHopCount,
+ nil,
+ npdu.GetApdu(),
+ 0,
+ )
+
+ for _, xadapter := range n.adapters {
+ if xadapter != adapter {
+ if err := xadapter.ProcessNPDU(NewPDUFromPDU(newpdu)); err != nil {
+ log.Warn().Err(err).Msg("Error processing npdu")
+ }
+ }
+ }
+ return nil
+ }
+
+ if destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS || destinationAddress.AddrType == REMOTE_STATION_ADDRESS {
+ dnet := destinationAddress.AddrNet
+ log.Debug().Msg("remote station/broadcast")
+
+ // see if this a locally connected network
+ if xadapter, ok := n.adapters[dnet]; ok {
+ if xadapter == adapter {
+ log.Debug().Msg("path error (4)")
+ return nil
+ }
+ log.Debug().Msgf("found path via %v", adapter)
+
+ // if this was a remote broadcast, it's now a local one
+ if destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS {
+ newDADR = NewLocalBroadcast(nil)
+ } else {
+ var err error
+ newDADR, err = NewLocalStation(destinationAddress.AddrAddress, nil)
+ if err != nil {
+ return errors.Wrap(err, "error building local station")
+ }
+ }
+
+ // last leg in routing
+ newDADR = nil
+
+ // send the packet downstream
+ newSADRLength := uint8(len(newSADR.AddrAddress))
+ newpdu.pduUserData = readWriteModel.NewNPDU(
+ npdu.GetProtocolVersionNumber(),
+ readWriteModel.NewNPDUControl(
+ false,
+ false,
+ true,
+ false,
+ readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE,
+ ),
+ nil,
+ nil,
+ nil,
+ newSADR.AddrNet,
+ &newSADRLength,
+ newSADR.AddrAddress,
+ &newNpduHopCount,
+ nil,
+ npdu.GetApdu(),
+ 0,
+ )
+
+ return xadapter.ProcessNPDU(NewPDUFromPDU(newpdu))
+ }
+
+ // look for routing information from the network of one of our adapters to the destination network
+ var routerInfo *RouterInfo
+ var snetAdapter *NetworkAdapter
+ for snet, _snetAdapter := range n.adapters {
+ if _routerInfo := n.routerInfoCache.GetRouterInfo(snet, dnet); _routerInfo != nil {
+ routerInfo = _routerInfo
+ snetAdapter = _snetAdapter
+ break
+ }
+ }
+
+ // found a path
+ if routerInfo != nil {
+ log.Debug().Msgf("found path via %v", routerInfo)
+
+ // the destination is the address of the router
+ pduDestination := routerInfo.address
+
+ // send the packet downstream
+ return snetAdapter.ProcessNPDU(NewPDUFromPDU(newpdu, WithPDUDestination(&pduDestination)))
+ }
+
+ log.Debug().Msg("No router info found")
+
+ // try to find a path to the network
+ xnpdu := readWriteModel.NewNLMWhoIsRouterToNetwork(dnet, 0)
+ pduDestination := NewLocalBroadcast(nil)
+
+ // send it to all of the connected adapters
+ for _, xadapter := range n.adapters {
+ // skip the horse it rode in on
+ if xadapter == adapter {
+ continue
+ }
+
+ // pass this along as if it came from the NSE
+ if err := n.SapIndicationWithAdapter(xadapter, NewPDU(xnpdu, WithPDUDestination(pduDestination))); err != nil {
+ return errors.Wrap(err, "error sending indication")
+ }
+ }
+
+ return nil
}
- // TODO: we need the type from the DADR which we don't have in our readwrite.NPDU so we might need a special _NPDU
- panic("implement me")
- _ = processLocally
- _ = forwardMessage
+ log.Debug().Msgf("bad DADR: %v:%v", npdu.GetDestinationNetworkAddress(), npdu.GetDestinationAddress())
return nil
}
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index b39150ef26..231b0b5ce0 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -246,25 +246,14 @@ func (d *UDPDirector) Close() error {
func (d *UDPDirector) handleRead() {
log.Debug().Msgf("handleRead(%v)", d.address)
- firstFourBytes := make([]byte, 4)
- if read, err := d.udpConn.Read(firstFourBytes); err != nil {
+ readBytes := make([]byte, 1500) // TODO: check if that is sufficient
+ var sourceAddr *net.UDPAddr
+ if _, addr, err := d.udpConn.ReadFromUDP(readBytes); err != nil {
log.Error().Err(err).Msg("error reading")
return
- } else if read != 4 {
- log.Error().Msgf("Not enough data %d", read)
- return
- }
-
- length := uint32(firstFourBytes[2])<<8 | uint32(firstFourBytes[3])
- remainingMessage := make([]byte, length-4)
- if read, err := d.udpConn.Read(remainingMessage); err != nil {
- log.Error().Err(err).Msg("error reading")
- return
- } else if read != int(length-4) {
- log.Error().Msgf("Not enough data: actual: %d, wanted: %d", read, length-4)
- return
+ } else {
+ sourceAddr = addr
}
- readBytes := append(firstFourBytes, remainingMessage...)
bvlc, err := model.BVLCParse(readBytes)
if err != nil {
@@ -273,8 +262,19 @@ func (d *UDPDirector) handleRead() {
return
}
- // TODO: how to get the addr? Maybe we ditch the transport instance and use the udp socket directly
- pdu := NewPDU(bvlc)
+ saddr, err := NewAddress(sourceAddr)
+ if err != nil {
+ // pass along to a handler
+ d.handleError(errors.Wrap(err, "error parsing source address"))
+ return
+ }
+ daddr, err := NewAddress(d.udpConn.LocalAddr())
+ if err != nil {
+ // pass along to a handler
+ d.handleError(errors.Wrap(err, "error parsing destination address"))
+ return
+ }
+ pdu := NewPDU(bvlc, WithPDUSource(saddr), WithPDUDestination(daddr))
// send the PDU up to the client
go d._response(pdu)
}
@@ -305,12 +305,12 @@ func (d *UDPDirector) _response(pdu _PDU) error {
log.Debug().Msgf("_response %s", pdu)
// get the destination
- addr := pdu.GetPDUDestination()
+ addr := pdu.GetPDUSource()
// get the peer
peer, ok := d.peers[addr.String()]
if !ok {
- peer = d.actorClass(d, (*addr).String())
+ peer = d.actorClass(d, addr.String())
}
// send the message