You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/16 14:41:57 UTC

[plc4x] branch develop updated: feat(plc4go/bacnet): basic comm working

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 574dd3f633 feat(plc4go/bacnet): basic comm working
574dd3f633 is described below

commit 574dd3f633a8c256f03bb35554eb5b517275540a
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Jan 16 15:41:49 2023 +0100

    feat(plc4go/bacnet): basic comm working
---
 plc4go/internal/bacnetip/ApplicationLayer.go       |   5 +-
 plc4go/internal/bacnetip/ApplicationModule.go      |   4 +-
 .../bacnetip/BACnetVirtualLinkLayerService.go      |  46 ++-
 plc4go/internal/bacnetip/IOCBModule.go             |  16 +-
 plc4go/internal/bacnetip/MessageCodec.go           |  12 +-
 plc4go/internal/bacnetip/NetworkService.go         | 322 ++++++++++++++++++++-
 .../internal/bacnetip/UDPCommunicationsModule.go   |  40 +--
 7 files changed, 381 insertions(+), 64 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index 1dcaf01551..2ddccb2843 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -1702,8 +1702,9 @@ func (s *StateMachineAccessPoint) Confirmation(apdu _PDU) error { // TODO: note
 	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
 		// find the client transaction this is acking
 		var tr *ClientSSM
-		for _, tr := range s.clientTransactions {
-			if apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == tr.invokeId && pduSource.Equals(tr.pduAddress) {
+		for _, _tr := range s.clientTransactions {
+			if _apdu.(interface{ GetOriginalInvokeId() uint8 }).GetOriginalInvokeId() == _tr.invokeId && pduSource.Equals(_tr.pduAddress) {
+				tr = _tr
 				break
 			}
 		}
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 5d0cd114b9..fc8822299d 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -365,7 +365,7 @@ func (a *ApplicationIOController) _AppComplete(address *Address, apdu _PDU) erro
 	}
 
 	// this request is complete
-	switch apdu.(type) {
+	switch apdu.GetMessage().(type) {
 	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
 		if err := queue.CompleteIO(queue.activeIOCB, apdu); err != nil {
 			return err
@@ -420,7 +420,7 @@ func (a *ApplicationIOController) Confirmation(apdu _PDU) error {
 	log.Debug().Msgf("Confirmation\n%s", apdu)
 
 	// this is an ack, error, reject or abort
-	return a._AppComplete(apdu.GetPDUDestination(), apdu)
+	return a._AppComplete(apdu.GetPDUSource(), apdu)
 }
 
 type BIPSimpleApplication struct {
diff --git a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
index 32c94ad301..693efbf314 100644
--- a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
+++ b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
@@ -68,7 +68,7 @@ func (m *_MultiplexServer) Indication(pdu _PDU) error {
 }
 
 type UDPMultiplexer struct {
-	address            Address
+	address            *Address
 	addrTuple          *AddressTuple[string, uint16]
 	addrBroadcastTuple *AddressTuple[string, uint16]
 	direct             *_MultiplexClient
@@ -87,21 +87,21 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
 	specialBroadcast := false
 	if address == nil {
 		address, _ := NewAddress()
-		u.address = *address
+		u.address = address
 		u.addrTuple = &AddressTuple[string, uint16]{"", 47808}
 		u.addrBroadcastTuple = &AddressTuple[string, uint16]{"255.255.255.255", 47808}
 	} else {
 		// allow the address to be cast
 		if caddress, ok := address.(*Address); ok {
-			u.address = *caddress
-		} else if caddress, ok := address.(Address); ok {
 			u.address = caddress
+		} else if caddress, ok := address.(Address); ok {
+			u.address = &caddress
 		} else {
 			newAddress, err := NewAddress(address)
 			if err != nil {
 				return nil, errors.Wrap(err, "error parsing address")
 			}
-			u.address = *newAddress
+			u.address = newAddress
 		}
 
 		// promote the normal and broadcast tuples
@@ -210,7 +210,33 @@ func (m *UDPMultiplexer) Confirmation(client *_MultiplexClient, pdu _PDU) error
 		return nil
 	}
 
-	// TODO: it is getting to messy, we need to solve the source destination topic
+	// TODO: upstream this is a tuple but we don't have that here so we can work with what we have
+	src := pduSource
+	var dest *Address
+
+	// match the destination in case the stack needs it
+	if client == m.direct {
+		log.Debug().Msg("direct to us")
+		dest = m.address
+	} else if client == m.broadcast {
+		log.Debug().Msg("broadcast to us")
+		dest = NewLocalBroadcast(nil)
+	} else {
+		return errors.New("Confirmation missmatch")
+	}
+	log.Debug().Msgf("dest: %s", dest)
+
+	// must have at least one octet
+	if pdu.GetMessage() == nil {
+		log.Debug().Msg("no data")
+		return nil
+	}
+
+	// TODO: we only support 0x81 at the moment
+	if m.annexJ != nil {
+		return m.annexJ.Response(NewPDU(pdu.GetMessage(), WithPDUSource(src), WithPDUDestination(dest)))
+	}
+
 	return nil
 }
 
@@ -242,7 +268,7 @@ func (b *AnnexJCodec) Indication(pdu _PDU) error {
 
 func (b *AnnexJCodec) Confirmation(pdu _PDU) error {
 	// Note: our BVLC are all annexJ at the moment
-	return b.Request(pdu)
+	return b.Response(pdu)
 }
 
 type _BIPSAP interface {
@@ -347,14 +373,14 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
 		return b.SapRequest(pdu)
 	case readWriteModel.BVLCOriginalUnicastNPDUExactly:
 		// build a vanilla PDU
-		xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
+		xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(pdu.GetPDUDestination()))
 		log.Debug().Msgf("xpdu: %s", xpdu)
 
 		// send it upstream
 		return b.Response(xpdu)
 	case readWriteModel.BVLCOriginalBroadcastNPDUExactly:
 		// build a PDU with a local broadcast address
-		xpdu := NewPDU(msg, WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
+		xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(pdu.GetPDUSource()), WithPDUDestination(NewLocalBroadcast(nil)))
 		log.Debug().Msgf("xpdu: %s", xpdu)
 
 		// send it upstream
@@ -367,7 +393,7 @@ func (b *BIPSimple) Confirmation(pdu _PDU) error {
 		if err != nil {
 			return errors.Wrap(err, "error building a ip")
 		}
-		xpdu := NewPDU(msg, WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
+		xpdu := NewPDU(msg.GetNpdu(), WithPDUSource(source), WithPDUDestination(NewLocalBroadcast(nil)))
 		log.Debug().Msgf("xpdu: %s", xpdu)
 
 		// send it upstream
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
index a886eb212b..bf1c39da5c 100644
--- a/plc4go/internal/bacnetip/IOCBModule.go
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -26,7 +26,6 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
-	"net"
 	"sync"
 	"time"
 )
@@ -89,7 +88,7 @@ type _IOCB interface {
 	Trigger()
 	setIOError(err error)
 	getRequest() _PDU
-	getDestination() net.Addr
+	getDestination() *Address
 	getPriority() int
 	clearQueue()
 	Abort(err error) error
@@ -101,7 +100,7 @@ var _identLock sync.Mutex
 type IOCB struct {
 	ioID           int
 	request        _PDU
-	destination    net.Addr
+	destination    *Address
 	ioState        IOCBState
 	ioResponse     _PDU
 	ioError        error
@@ -115,7 +114,7 @@ type IOCB struct {
 	priority       int
 }
 
-func NewIOCB(request _PDU, destination net.Addr) (*IOCB, error) {
+func NewIOCB(request _PDU, destination *Address) (*IOCB, error) {
 	// lock the identity sequence number
 	_identLock.Lock()
 
@@ -276,7 +275,7 @@ func (i *IOCB) getRequest() _PDU {
 	return i.request
 }
 
-func (i *IOCB) getDestination() net.Addr {
+func (i *IOCB) getDestination() *Address {
 	return i.destination
 }
 
@@ -793,17 +792,16 @@ func (i *IOQController) _waitTrigger() error {
 	stateLog.Debug().Msgf("%s %s %s", time.Now(), i.name, "idle")
 
 	// look for more to do
-	i._trigger()
-	return nil
+	return i._trigger()
 }
 
 type SieveQueue struct {
 	*IOQController
 	requestFn func(apdu _PDU)
-	address   net.Addr
+	address   *Address
 }
 
-func NewSieveQueue(fn func(apdu _PDU), address net.Addr) (*SieveQueue, error) {
+func NewSieveQueue(fn func(apdu _PDU), address *Address) (*SieveQueue, error) {
 	s := &SieveQueue{}
 	var err error
 	s.IOQController, err = NewIOQController(address.String(), s)
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 5fe764b7dd..b1df57d4a3 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -102,11 +102,11 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
 }
 
 func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
-	address, err2 := NewAddress(m.remoteAddress)
-	if err2 != nil {
-		panic(err2)
+	address, err := NewAddress(m.remoteAddress)
+	if err != nil {
+		return err
 	}
-	iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), m.remoteAddress)
+	iocb, err := NewIOCB(NewPDU(message, WithPDUDestination(address)), address)
 	if err != nil {
 		return errors.Wrap(err, "error creating IOCB")
 	}
@@ -115,10 +115,10 @@ func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
 		iocb.Wait()
 		if iocb.ioError != nil {
 			// TODO: handle error
-			println(iocb.ioError)
+			fmt.Printf("Err: %v\n", iocb.ioError)
 		} else if iocb.ioResponse != nil {
 			// TODO: response?
-			println(iocb.ioResponse)
+			fmt.Printf("Response: %v\n", iocb.ioResponse)
 		} else {
 			// TODO: what now?
 		}
diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/NetworkService.go
index e60d1c1105..4a44fcbb93 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/NetworkService.go
@@ -20,6 +20,7 @@
 package bacnetip
 
 import (
+	"bytes"
 	"fmt"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/pkg/errors"
@@ -488,15 +489,39 @@ func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PD
 		return errors.New("no adapters")
 	}
 
-	npdu := pdu.GetMessage().(readWriteModel.NPDU)
+	var (
+		processLocally bool
+		forwardMessage bool
+	)
 
-	var snet *uint16
-	// check for source routing
+	npdu := pdu.GetMessage().(readWriteModel.NPDU)
+	sourceAddress := &Address{AddrType: NULL_ADDRESS}
 	if npdu.GetControl().GetSourceSpecified() {
+		snet := npdu.GetSourceNetworkAddress()
+		sadr := npdu.GetSourceAddress()
+		var err error
+		sourceAddress, err = NewAddress(snet, sadr)
+		if err != nil {
+			return errors.Wrapf(err, "error parsing source address %x", sadr)
+		}
+	}
+	destinationAddress := &Address{AddrType: NULL_ADDRESS}
+	if npdu.GetControl().GetDestinationSpecified() {
+		dnet := npdu.GetDestinationNetworkAddress()
+		dadr := npdu.GetDestinationAddress()
+		var err error
+		destinationAddress, err = NewAddress(dnet, dadr)
+		if err != nil {
+			return errors.Wrapf(err, "error parsing destination address %x", dadr)
+		}
+	}
+	switch {
+	// check for source routing
+	case npdu.GetControl().GetSourceSpecified() && sourceAddress.AddrType != NULL_ADDRESS:
 		log.Debug().Msg("check source path")
 
 		// see if this is attempting to spoof a directly connected network
-		snet = npdu.GetSourceNetworkAddress()
+		snet := npdu.GetSourceNetworkAddress()
 		if _, ok := n.adapters[snet]; !ok {
 			log.Warn().Msg("path error (1)")
 			return nil
@@ -504,24 +529,291 @@ func (n *NetworkServiceAccessPoint) ProcessNPDU(adapter *NetworkAdapter, pdu _PD
 
 		// pass this new path along to the cache
 		n.routerInfoCache.UpdateRouterStatus(adapter.adapterNet, pdu.GetPDUSource(), []*uint16{snet})
-	}
-
-	var (
-		processLocally bool
-		forwardMessage bool
-	)
 	// check for destination routing
-	if npdu.GetControl().GetDestinationSpecified() {
+	case !npdu.GetControl().GetDestinationSpecified() || destinationAddress.AddrType == NULL_ADDRESS:
 		log.Debug().Msg("no DADR")
 
 		processLocally = adapter == n.localAdapter || npdu.GetControl().GetMessageTypeFieldPresent()
 		forwardMessage = false
+	case destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS:
+		log.Debug().Msg("DADR is remote broadcast")
+
+		if *destinationAddress.AddrNet == *adapter.adapterNet {
+			log.Warn().Msg("path error (2)")
+			return nil
+		}
+
+		processLocally = *destinationAddress.AddrNet == *n.localAdapter.adapterNet
+		forwardMessage = true
+	case destinationAddress.AddrType == REMOTE_STATION_ADDRESS:
+		log.Debug().Msg("DADR is remote station")
+
+		if *destinationAddress.AddrNet == *adapter.adapterNet {
+			log.Warn().Msg("path error (3)")
+			return nil
+		}
+
+		processLocally = *destinationAddress.AddrNet == *n.localAdapter.adapterNet && bytes.Compare(destinationAddress.AddrAddress, n.localAdapter.adapterAddr.AddrAddress) == 0
+		forwardMessage = !processLocally
+	case destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS:
+		log.Debug().Msg("DADR is global broadcast")
+
+		processLocally = true
+		forwardMessage = true
+	default:
+		log.Warn().Msgf("invalid destination address type: %s", destinationAddress.AddrType)
+		return nil
+	}
+
+	log.Debug().Msgf("processLocally: %t", processLocally)
+	log.Debug().Msgf("forwardMessage: %t", forwardMessage)
+
+	// application or network layer message
+	if !npdu.GetControl().GetMessageTypeFieldPresent() {
+		log.Debug().Msg("application layer message")
+
+		// decode as a generic APDU
+		apdu := NewPDU(npdu.GetApdu())
+		log.Debug().Msgf("apdu:\n%s", apdu)
+
+		// see if it needs to look routed
+		if len(n.adapters) > 1 && adapter != n.localAdapter {
+			// combine the source address
+			if !npdu.GetControl().GetSourceSpecified() {
+				remoteStationAddress, err := NewAddress(adapter.adapterNet, pdu.GetPDUSource().AddrAddress)
+				if err != nil {
+					return errors.Wrap(err, "error creating remote address")
+				}
+				apdu.pduSource = remoteStationAddress
+			} else {
+				apdu.pduSource = sourceAddress
+			}
+			if settings.RouteAware {
+				apdu.pduSource.AddrRoute = pdu.GetPDUSource()
+			}
+
+			// map the destination
+			if !npdu.GetControl().GetDestinationSpecified() {
+				apdu.pduDestination = n.localAdapter.adapterAddr
+			} else if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+				apdu.pduDestination = NewGlobalBroadcast(nil)
+			} else if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+				apdu.pduDestination = NewLocalBroadcast(nil)
+			} else {
+				apdu.pduDestination = n.localAdapter.adapterAddr
+			}
+		} else {
+			// combine the source address
+			if npdu.GetControl().GetSourceSpecified() {
+				apdu.pduSource = sourceAddress
+				if settings.RouteAware {
+					log.Debug().Msg("adding route")
+					apdu.pduSource = pdu.GetPDUSource()
+				}
+			} else {
+				apdu.pduSource = pdu.GetPDUSource()
+			}
+
+			// pass along global broadcast
+			if npdu.GetControl().GetDestinationSpecified() && destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+				apdu.pduDestination = NewGlobalBroadcast(nil)
+			} else {
+				apdu.pduDestination = pdu.GetPDUDestination()
+			}
+		}
+
+		log.Debug().Msgf("apdu.pduSource: %s", apdu.pduSource)
+		log.Debug().Msgf("apdu.pduDestination: %s", apdu.pduDestination)
+
+		if err := n.Response(apdu); err != nil {
+			return errors.Wrap(err, "error passing response")
+		}
+	} else {
+		log.Debug().Msg("network layer message")
+
+		if processLocally {
+			log.Debug().Msg("processing NPDU locally")
+
+			// pass to the service element
+			// TODO: how to pass the adapter???
+			if err := n.SapRequest(pdu); err != nil {
+				return errors.Wrap(err, "error passing sap _request")
+			}
+		}
+	}
+
+	// might not need to forward this to other devices
+	if !forwardMessage {
+		log.Debug().Msg("no forwarding")
+		return nil
+	}
+
+	// make sure we're really a router
+	if len(n.adapters) == 1 {
+		log.Debug().Msg("not a router")
+		return nil
+	}
+
+	// make sure it hasn't looped
+	if npdu.GetHopCount() != nil && *npdu.GetHopCount() == 0 {
+		log.Debug().Msg("no more hops")
+		return nil
+	}
+
+	// build a new NPDU to send to other adapters
+	newpdu := NewPDUFromPDU(pdu)
+
+	// decrease the hop count
+	newNpduHopCount := *npdu.GetHopCount() - 1
+
+	// set the source address
+	var newSADR *Address
+	if !npdu.GetControl().GetSourceSpecified() {
+		var err error
+		newSADR, err = NewRemoteStation(adapter.adapterNet, sourceAddress.AddrAddress, nil)
+		if err != nil {
+			return errors.Wrap(err, "error creating remote station")
+		}
+	} else {
+		newSADR = destinationAddress
+	}
+
+	var newDADR *Address
+	// If this is a broadcast it goes everywhere
+	if destinationAddress.AddrType == GLOBAL_BROADCAST_ADDRESS {
+		log.Debug().Msg("global broadcasting")
+
+		newDADR = NewLocalBroadcast(nil)
+		newSADRLength := uint8(len(newSADR.AddrAddress))
+		newDADRLength := uint8(len(newDADR.AddrAddress))
+		newpdu.pduUserData = readWriteModel.NewNPDU(
+			npdu.GetProtocolVersionNumber(),
+			readWriteModel.NewNPDUControl(
+				false,
+				true,
+				true,
+				false,
+				readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE,
+			),
+			newDADR.AddrNet,
+			&newDADRLength,
+			newDADR.AddrAddress,
+			newSADR.AddrNet,
+			&newSADRLength,
+			newSADR.AddrAddress,
+			&newNpduHopCount,
+			nil,
+			npdu.GetApdu(),
+			0,
+		)
+
+		for _, xadapter := range n.adapters {
+			if xadapter != adapter {
+				if err := xadapter.ProcessNPDU(NewPDUFromPDU(newpdu)); err != nil {
+					log.Warn().Err(err).Msg("Error processing npdu")
+				}
+			}
+		}
+		return nil
+	}
+
+	if destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS || destinationAddress.AddrType == REMOTE_STATION_ADDRESS {
+		dnet := destinationAddress.AddrNet
+		log.Debug().Msg("remote station/broadcast")
+
+		// see if this a locally connected network
+		if xadapter, ok := n.adapters[dnet]; ok {
+			if xadapter == adapter {
+				log.Debug().Msg("path error (4)")
+				return nil
+			}
+			log.Debug().Msgf("found path via %v", adapter)
+
+			// if this was a remote broadcast, it's now a local one
+			if destinationAddress.AddrType == REMOTE_BROADCAST_ADDRESS {
+				newDADR = NewLocalBroadcast(nil)
+			} else {
+				var err error
+				newDADR, err = NewLocalStation(destinationAddress.AddrAddress, nil)
+				if err != nil {
+					return errors.Wrap(err, "error building local station")
+				}
+			}
+
+			// last leg in routing
+			newDADR = nil
+
+			// send the packet downstream
+			newSADRLength := uint8(len(newSADR.AddrAddress))
+			newpdu.pduUserData = readWriteModel.NewNPDU(
+				npdu.GetProtocolVersionNumber(),
+				readWriteModel.NewNPDUControl(
+					false,
+					false,
+					true,
+					false,
+					readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE,
+				),
+				nil,
+				nil,
+				nil,
+				newSADR.AddrNet,
+				&newSADRLength,
+				newSADR.AddrAddress,
+				&newNpduHopCount,
+				nil,
+				npdu.GetApdu(),
+				0,
+			)
+
+			return xadapter.ProcessNPDU(NewPDUFromPDU(newpdu))
+		}
+
+		// look for routing information from the network of one of our adapters to the destination network
+		var routerInfo *RouterInfo
+		var snetAdapter *NetworkAdapter
+		for snet, _snetAdapter := range n.adapters {
+			if _routerInfo := n.routerInfoCache.GetRouterInfo(snet, dnet); _routerInfo != nil {
+				routerInfo = _routerInfo
+				snetAdapter = _snetAdapter
+				break
+			}
+		}
+
+		// found a path
+		if routerInfo != nil {
+			log.Debug().Msgf("found path via %v", routerInfo)
+
+			// the destination is the address of the router
+			pduDestination := routerInfo.address
+
+			//  send the packet downstream
+			return snetAdapter.ProcessNPDU(NewPDUFromPDU(newpdu, WithPDUDestination(&pduDestination)))
+		}
+
+		log.Debug().Msg("No router info found")
+
+		// try to find a path to the network
+		xnpdu := readWriteModel.NewNLMWhoIsRouterToNetwork(dnet, 0)
+		pduDestination := NewLocalBroadcast(nil)
+
+		// send it to all of the connected adapters
+		for _, xadapter := range n.adapters {
+			// skip the horse it rode in on
+			if xadapter == adapter {
+				continue
+			}
+
+			// pass this along as if it came from the NSE
+			if err := n.SapIndicationWithAdapter(xadapter, NewPDU(xnpdu, WithPDUDestination(pduDestination))); err != nil {
+				return errors.Wrap(err, "error sending indication")
+			}
+		}
+
+		return nil
 	}
-	// TODO: we need the type from the DADR which we don't have in our readwrite.NPDU so we might need a special _NPDU
 
-	panic("implement me")
-	_ = processLocally
-	_ = forwardMessage
+	log.Debug().Msgf("bad DADR: %v:%v", npdu.GetDestinationNetworkAddress(), npdu.GetDestinationAddress())
 	return nil
 }
 
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index b39150ef26..231b0b5ce0 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -246,25 +246,14 @@ func (d *UDPDirector) Close() error {
 func (d *UDPDirector) handleRead() {
 	log.Debug().Msgf("handleRead(%v)", d.address)
 
-	firstFourBytes := make([]byte, 4)
-	if read, err := d.udpConn.Read(firstFourBytes); err != nil {
+	readBytes := make([]byte, 1500) // TODO: check if that is sufficient
+	var sourceAddr *net.UDPAddr
+	if _, addr, err := d.udpConn.ReadFromUDP(readBytes); err != nil {
 		log.Error().Err(err).Msg("error reading")
 		return
-	} else if read != 4 {
-		log.Error().Msgf("Not enough data %d", read)
-		return
-	}
-
-	length := uint32(firstFourBytes[2])<<8 | uint32(firstFourBytes[3])
-	remainingMessage := make([]byte, length-4)
-	if read, err := d.udpConn.Read(remainingMessage); err != nil {
-		log.Error().Err(err).Msg("error reading")
-		return
-	} else if read != int(length-4) {
-		log.Error().Msgf("Not enough data: actual: %d, wanted: %d", read, length-4)
-		return
+	} else {
+		sourceAddr = addr
 	}
-	readBytes := append(firstFourBytes, remainingMessage...)
 
 	bvlc, err := model.BVLCParse(readBytes)
 	if err != nil {
@@ -273,8 +262,19 @@ func (d *UDPDirector) handleRead() {
 		return
 	}
 
-	// TODO: how to get the addr? Maybe we ditch the transport instance and use the udp socket directly
-	pdu := NewPDU(bvlc)
+	saddr, err := NewAddress(sourceAddr)
+	if err != nil {
+		// pass along to a handler
+		d.handleError(errors.Wrap(err, "error parsing source address"))
+		return
+	}
+	daddr, err := NewAddress(d.udpConn.LocalAddr())
+	if err != nil {
+		// pass along to a handler
+		d.handleError(errors.Wrap(err, "error parsing destination address"))
+		return
+	}
+	pdu := NewPDU(bvlc, WithPDUSource(saddr), WithPDUDestination(daddr))
 	// send the PDU up to the client
 	go d._response(pdu)
 }
@@ -305,12 +305,12 @@ func (d *UDPDirector) _response(pdu _PDU) error {
 	log.Debug().Msgf("_response %s", pdu)
 
 	// get the destination
-	addr := pdu.GetPDUDestination()
+	addr := pdu.GetPDUSource()
 
 	// get the peer
 	peer, ok := d.peers[addr.String()]
 	if !ok {
-		peer = d.actorClass(d, (*addr).String())
+		peer = d.actorClass(d, addr.String())
 	}
 
 	// send the message