You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/10 11:31:59 UTC

[plc4x] 02/02: feat(plc4go/bacnet): update udp comms

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 86b07fb9e9e21dfd7d6801013b60e04750c146aa
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jan 10 12:31:49 2023 +0100

    feat(plc4go/bacnet): update udp comms
---
 .../bacnetip/BACnetVirtualLinkLayerService.go      |  10 +-
 plc4go/internal/bacnetip/Task.go                   |   4 +-
 .../internal/bacnetip/UDPCommunicationsModule.go   | 252 ++++++++++++++++++++-
 3 files changed, 249 insertions(+), 17 deletions(-)

diff --git a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
index 9a467b43cb..32c94ad301 100644
--- a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
+++ b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
@@ -130,7 +130,7 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating multiplex client")
 	}
-	u.directPort, err = NewUDPDirector(u.addrTuple, nil, nil, nil, nil)
+	u.directPort, err = NewUDPDirector(*u.addrTuple, nil, nil, nil, nil)
 	if err := bind(u.direct, u.directPort); err != nil {
 		return nil, errors.Wrap(err, "error binding ports")
 	}
@@ -142,7 +142,7 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
 			return nil, errors.Wrap(err, "error creating broadcast multiplex client")
 		}
 		reuse := true
-		u.broadcastPort, err = NewUDPDirector(u.addrBroadcastTuple, nil, &reuse, nil, nil)
+		u.broadcastPort, err = NewUDPDirector(*u.addrBroadcastTuple, nil, &reuse, nil, nil)
 		if err := bind(u.direct, u.directPort); err != nil {
 			return nil, errors.Wrap(err, "error binding ports")
 		}
@@ -236,11 +236,13 @@ func NewAnnexJCodec(cid *int, sid *int) (*AnnexJCodec, error) {
 }
 
 func (b *AnnexJCodec) Indication(pdu _PDU) error {
-	panic("not implemented yet")
+	// Note: our BVLC are all annexJ at the moment
+	return b.Request(pdu)
 }
 
 func (b *AnnexJCodec) Confirmation(pdu _PDU) error {
-	panic("not implemented yet")
+	// Note: our BVLC are all annexJ at the moment
+	return b.Request(pdu)
 }
 
 type _BIPSAP interface {
diff --git a/plc4go/internal/bacnetip/Task.go b/plc4go/internal/bacnetip/Task.go
index 32700aa341..d6ce37ed35 100644
--- a/plc4go/internal/bacnetip/Task.go
+++ b/plc4go/internal/bacnetip/Task.go
@@ -47,7 +47,7 @@ type OneShotDeleteTask struct {
 	_Task
 }
 
-func FunctionTask(func()) _Task {
+func FunctionTask(func()) *_Task {
 	// TODO: implement me
-	return _Task{}
+	return &_Task{}
 }
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index 43714a3ebd..5eb1c302a7 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -20,38 +20,268 @@
 package bacnetip
 
 import (
+	"bufio"
+	"fmt"
+	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi/transports/udp"
 	"github.com/pkg/errors"
+	"github.com/rs/zerolog/log"
+	"net"
+	"time"
 )
 
 type UDPActor struct {
+	director *UDPDirector
+	timeout  uint32
+	timer    *_Task
+	peer     string
 }
 
-type UDPPickleActor struct {
-	*UDPActor
+func NewUDPActor(director *UDPDirector, peer string) *UDPActor {
+	a := &UDPActor{}
+
+	// keep track of the director
+	a.director = director
+
+	// associated with a peer
+	a.peer = peer
+
+	// Add a timer
+	a.timeout = director.timeout
+	if a.timeout > 0 {
+		a.timer = FunctionTask(a.idleTimeout)
+		when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+		a.timer.InstallTask(&when, nil)
+	}
+
+	// tell the director this is a new actor
+	a.director.AddActor(a)
+	return a
+}
+
+func (a *UDPActor) idleTimeout() {
+	log.Debug().Msg("idleTimeout")
+
+	// tell the director this is gone
+	a.director.DelActor(a)
+}
+
+func (a *UDPActor) Indication(pdu _PDU) error {
+	log.Debug().Msgf("Indication %s", pdu)
+
+	// reschedule the timer
+	if a.timer != nil {
+		when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+		a.timer.InstallTask(&when, nil)
+	}
+
+	// put it in the outbound queue for the director
+	a.director.request <- pdu
+	return nil
+}
+
+func (a *UDPActor) Response(pdu _PDU) error {
+	log.Debug().Msgf("Response %s", pdu)
+
+	// reschedule the timer
+	if a.timer != nil {
+		when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+		a.timer.InstallTask(&when, nil)
+	}
+
+	// process this as a response from the director
+	return a.director.Response(pdu)
+}
+
+func (a *UDPActor) HandleError(err error) {
+	log.Debug().Err(err).Msg("HandleError")
+
+	if err != nil {
+		a.director.ActorError(err)
+	}
 }
 
-// TODO: finish me
 type UDPDirector struct {
 	*Server
 	*ServiceAccessPoint
-}
 
-func (d *UDPDirector) Close() {
+	timeout uint32
+	reuse   bool
+	address AddressTuple[string, uint16]
+	ti      *udp.TransportInstance
 
+	actorClass func(*UDPDirector, string) *UDPActor
+	request    chan _PDU
+	peers      map[string]*UDPActor
+	running    bool
 }
 
-func NewUDPDirector(address *AddressTuple[string, uint16], timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
-	u := &UDPDirector{}
+func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
+	d := &UDPDirector{}
 	var err error
-	u.Server, err = NewServer(sid, u)
+	d.Server, err = NewServer(sid, d)
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating server")
 	}
-	u.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, u)
+	d.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, d)
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating service access point")
 	}
 
-	// TODO: finish this
-	return u, nil
+	// check the actor class
+	d.actorClass = NewUDPActor
+
+	// save the timeout for actors
+	if timeout != nil {
+		d.timeout = uint32(*timeout)
+	}
+
+	if reuse != nil {
+		d.reuse = *reuse
+	}
+
+	// save the address
+	d.address = address
+
+	// ask the dispatcher for a socket
+	resolvedAddress, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", address.Left, address.Right))
+	if err != nil {
+		return nil, errors.Wrap(err, "error resolving udp address")
+	}
+	d.ti = udp.NewTransportInstance(resolvedAddress, nil, d.timeout, d.reuse, nil)
+
+	d.running = true
+	go func() {
+		for d.running {
+			d.handleRead()
+		}
+	}()
+
+	// create the request queue
+	d.request = make(chan _PDU)
+	go func() {
+		// TODO: get requests and send them...
+	}()
+
+	// start with an empty peer pool
+	d.peers = nil
+
+	return d, nil
+}
+
+// AddActor adds an actor when a new one is connected
+func (d *UDPDirector) AddActor(actor *UDPActor) {
+	log.Debug().Msgf("AddActor %v", actor)
+
+	d.peers[actor.peer] = actor
+
+	// tell the ASE there is a new client
+	if d.serviceElement != nil {
+		// TODO: not sure how to realize that
+		//d.SapRequest(actor)
+	}
+}
+
+// DelActor removes an actor when the socket is closed.
+func (d *UDPDirector) DelActor(actor *UDPActor) {
+	log.Debug().Msgf("DelActor %v", actor)
+
+	delete(d.peers, actor.peer)
+
+	// tell the ASE the client has gone away
+	if d.serviceElement != nil {
+		// TODO: not sure how to realize that
+		//d.SapRequest(actor)
+	}
+}
+
+func (d *UDPDirector) GetActor(address Address) *UDPActor {
+	return d.peers[address.String()]
+}
+
+func (d *UDPDirector) ActorError(err error) {
+	// tell the ASE the actor had an error
+	if d.serviceElement != nil {
+		// TODO: not sure how to realize that
+		//d.SapRequest(actor, err)
+	}
+}
+
+func (d *UDPDirector) Close() error {
+	d.running = false
+	return d.ti.Close()
+}
+
+func (d *UDPDirector) handleRead() {
+	log.Debug().Msgf("handleRead(%v)", d.address)
+
+	if err := d.ti.FillBuffer(func(pos uint, _ byte, _ *bufio.Reader) bool {
+		if pos >= 4 {
+			return false
+		}
+		return true
+	}); err != nil {
+		// pass along to a handler
+		d.handleError(errors.Wrap(err, "error filling buffer"))
+		return
+	}
+	peekedBytes, err := d.ti.PeekReadableBytes(4)
+	if err != nil {
+		// pass along to a handler
+		d.handleError(errors.Wrap(err, "error peeking 4 bytes"))
+		return
+	}
+
+	length := uint32(peekedBytes[2])<<8 | uint32(peekedBytes[3])
+	readBytes, err := d.ti.Read(length)
+
+	bvlc, err := model.BVLCParse(readBytes)
+	if err != nil {
+		// pass along to a handler
+		d.handleError(errors.Wrap(err, "error parsing bvlc"))
+		return
+	}
+
+	// TODO: how to get the addr? Maybe we ditch the transport instance and use the udp socket directly
+	pdu := NewPDU(bvlc)
+	// send the PDU up to the client
+	go d._response(pdu)
+}
+
+func (d *UDPDirector) handleError(err error) {
+	log.Debug().Err(err).Msg("handleError")
+}
+
+// Indication Client requests are queued for delivery.
+func (d *UDPDirector) Indication(pdu _PDU) error {
+	log.Debug().Msgf("Indication %s", pdu)
+
+	// get the destination
+	addr := pdu.GetPDUDestination()
+
+	// get the peer
+	peer, ok := d.peers[addr.String()]
+	if !ok {
+		peer = d.actorClass(d, (*addr).String())
+	}
+
+	// send the message
+	return peer.Indication(pdu)
+}
+
+// _response Incoming datagrams are routed through an actor.
+func (d *UDPDirector) _response(pdu _PDU) error {
+	log.Debug().Msgf("_response %s", pdu)
+
+	// get the destination
+	addr := pdu.GetPDUDestination()
+
+	// get the peer
+	peer, ok := d.peers[addr.String()]
+	if !ok {
+		peer = d.actorClass(d, (*addr).String())
+	}
+
+	// send the message
+	return peer.Response(pdu)
 }