You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/01/10 11:31:59 UTC
[plc4x] 02/02: feat(plc4go/bacnet): update udp comms
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 86b07fb9e9e21dfd7d6801013b60e04750c146aa
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Jan 10 12:31:49 2023 +0100
feat(plc4go/bacnet): update udp comms
---
.../bacnetip/BACnetVirtualLinkLayerService.go | 10 +-
plc4go/internal/bacnetip/Task.go | 4 +-
.../internal/bacnetip/UDPCommunicationsModule.go | 252 ++++++++++++++++++++-
3 files changed, 249 insertions(+), 17 deletions(-)
diff --git a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
index 9a467b43cb..32c94ad301 100644
--- a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
+++ b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
@@ -130,7 +130,7 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
if err != nil {
return nil, errors.Wrap(err, "error creating multiplex client")
}
- u.directPort, err = NewUDPDirector(u.addrTuple, nil, nil, nil, nil)
+ u.directPort, err = NewUDPDirector(*u.addrTuple, nil, nil, nil, nil)
if err := bind(u.direct, u.directPort); err != nil {
return nil, errors.Wrap(err, "error binding ports")
}
@@ -142,7 +142,7 @@ func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer,
return nil, errors.Wrap(err, "error creating broadcast multiplex client")
}
reuse := true
- u.broadcastPort, err = NewUDPDirector(u.addrBroadcastTuple, nil, &reuse, nil, nil)
+ u.broadcastPort, err = NewUDPDirector(*u.addrBroadcastTuple, nil, &reuse, nil, nil)
if err := bind(u.direct, u.directPort); err != nil {
return nil, errors.Wrap(err, "error binding ports")
}
@@ -236,11 +236,13 @@ func NewAnnexJCodec(cid *int, sid *int) (*AnnexJCodec, error) {
}
func (b *AnnexJCodec) Indication(pdu _PDU) error {
- panic("not implemented yet")
+ // Note: our BVLC are all annexJ at the moment
+ return b.Request(pdu)
}
func (b *AnnexJCodec) Confirmation(pdu _PDU) error {
- panic("not implemented yet")
+ // Note: our BVLC are all annexJ at the moment
+ return b.Request(pdu)
}
type _BIPSAP interface {
diff --git a/plc4go/internal/bacnetip/Task.go b/plc4go/internal/bacnetip/Task.go
index 32700aa341..d6ce37ed35 100644
--- a/plc4go/internal/bacnetip/Task.go
+++ b/plc4go/internal/bacnetip/Task.go
@@ -47,7 +47,7 @@ type OneShotDeleteTask struct {
_Task
}
-func FunctionTask(func()) _Task {
+func FunctionTask(func()) *_Task {
// TODO: implement me
- return _Task{}
+ return &_Task{}
}
diff --git a/plc4go/internal/bacnetip/UDPCommunicationsModule.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
index 43714a3ebd..5eb1c302a7 100644
--- a/plc4go/internal/bacnetip/UDPCommunicationsModule.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -20,38 +20,268 @@
package bacnetip
import (
+ "bufio"
+ "fmt"
+ "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
+ "net"
+ "time"
)
type UDPActor struct {
+ director *UDPDirector
+ timeout uint32
+ timer *_Task
+ peer string
}
-type UDPPickleActor struct {
- *UDPActor
+func NewUDPActor(director *UDPDirector, peer string) *UDPActor {
+ a := &UDPActor{}
+
+ // keep track of the director
+ a.director = director
+
+ // associated with a peer
+ a.peer = peer
+
+ // Add a timer
+ a.timeout = director.timeout
+ if a.timeout > 0 {
+ a.timer = FunctionTask(a.idleTimeout)
+ when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+ a.timer.InstallTask(&when, nil)
+ }
+
+ // tell the director this is a new actor
+ a.director.AddActor(a)
+ return a
+}
+
+func (a *UDPActor) idleTimeout() {
+ log.Debug().Msg("idleTimeout")
+
+ // tell the director this is gone
+ a.director.DelActor(a)
+}
+
+func (a *UDPActor) Indication(pdu _PDU) error {
+ log.Debug().Msgf("Indication %s", pdu)
+
+ // reschedule the timer
+ if a.timer != nil {
+ when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+ a.timer.InstallTask(&when, nil)
+ }
+
+ // put it in the outbound queue for the director
+ a.director.request <- pdu
+ return nil
+}
+
+func (a *UDPActor) Response(pdu _PDU) error {
+ log.Debug().Msgf("Response %s", pdu)
+
+ // reschedule the timer
+ if a.timer != nil {
+ when := time.Now().Add(time.Duration(a.timeout) * time.Millisecond)
+ a.timer.InstallTask(&when, nil)
+ }
+
+ // process this as a response from the director
+ return a.director.Response(pdu)
+}
+
+func (a *UDPActor) HandleError(err error) {
+ log.Debug().Err(err).Msg("HandleError")
+
+ if err != nil {
+ a.director.ActorError(err)
+ }
}
-// TODO: finish me
type UDPDirector struct {
*Server
*ServiceAccessPoint
-}
-func (d *UDPDirector) Close() {
+ timeout uint32
+ reuse bool
+ address AddressTuple[string, uint16]
+ ti *udp.TransportInstance
+ actorClass func(*UDPDirector, string) *UDPActor
+ request chan _PDU
+ peers map[string]*UDPActor
+ running bool
}
-func NewUDPDirector(address *AddressTuple[string, uint16], timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
- u := &UDPDirector{}
+func NewUDPDirector(address AddressTuple[string, uint16], timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
+ d := &UDPDirector{}
var err error
- u.Server, err = NewServer(sid, u)
+ d.Server, err = NewServer(sid, d)
if err != nil {
return nil, errors.Wrap(err, "error creating server")
}
- u.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, u)
+ d.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, d)
if err != nil {
return nil, errors.Wrap(err, "error creating service access point")
}
- // TODO: finish this
- return u, nil
+ // check the actor class
+ d.actorClass = NewUDPActor
+
+ // save the timeout for actors
+ if timeout != nil {
+ d.timeout = uint32(*timeout)
+ }
+
+ if reuse != nil {
+ d.reuse = *reuse
+ }
+
+ // save the address
+ d.address = address
+
+ // ask the dispatcher for a socket
+ resolvedAddress, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", address.Left, address.Right))
+ if err != nil {
+ return nil, errors.Wrap(err, "error resolving udp address")
+ }
+ d.ti = udp.NewTransportInstance(resolvedAddress, nil, d.timeout, d.reuse, nil)
+
+ d.running = true
+ go func() {
+ for d.running {
+ d.handleRead()
+ }
+ }()
+
+ // create the request queue
+ d.request = make(chan _PDU)
+ go func() {
+ // TODO: get requests and send them...
+ }()
+
+ // start with an empty peer pool
+ d.peers = nil
+
+ return d, nil
+}
+
+// AddActor adds an actor when a new one is connected
+func (d *UDPDirector) AddActor(actor *UDPActor) {
+ log.Debug().Msgf("AddActor %v", actor)
+
+ d.peers[actor.peer] = actor
+
+ // tell the ASE there is a new client
+ if d.serviceElement != nil {
+ // TODO: not sure how to realize that
+ //d.SapRequest(actor)
+ }
+}
+
+// DelActor removes an actor when the socket is closed.
+func (d *UDPDirector) DelActor(actor *UDPActor) {
+ log.Debug().Msgf("DelActor %v", actor)
+
+ delete(d.peers, actor.peer)
+
+ // tell the ASE the client has gone away
+ if d.serviceElement != nil {
+ // TODO: not sure how to realize that
+ //d.SapRequest(actor)
+ }
+}
+
+func (d *UDPDirector) GetActor(address Address) *UDPActor {
+ return d.peers[address.String()]
+}
+
+func (d *UDPDirector) ActorError(err error) {
+ // tell the ASE the actor had an error
+ if d.serviceElement != nil {
+ // TODO: not sure how to realize that
+ //d.SapRequest(actor, err)
+ }
+}
+
+func (d *UDPDirector) Close() error {
+ d.running = false
+ return d.ti.Close()
+}
+
+func (d *UDPDirector) handleRead() {
+ log.Debug().Msgf("handleRead(%v)", d.address)
+
+ if err := d.ti.FillBuffer(func(pos uint, _ byte, _ *bufio.Reader) bool {
+ if pos >= 4 {
+ return false
+ }
+ return true
+ }); err != nil {
+ // pass along to a handler
+ d.handleError(errors.Wrap(err, "error filling buffer"))
+ return
+ }
+ peekedBytes, err := d.ti.PeekReadableBytes(4)
+ if err != nil {
+ // pass along to a handler
+ d.handleError(errors.Wrap(err, "error peeking 4 bytes"))
+ return
+ }
+
+ length := uint32(peekedBytes[2])<<8 | uint32(peekedBytes[3])
+ readBytes, err := d.ti.Read(length)
+
+ bvlc, err := model.BVLCParse(readBytes)
+ if err != nil {
+ // pass along to a handler
+ d.handleError(errors.Wrap(err, "error parsing bvlc"))
+ return
+ }
+
+ // TODO: how to get the addr? Maybe we ditch the transport instance and use the udp socket directly
+ pdu := NewPDU(bvlc)
+ // send the PDU up to the client
+ go d._response(pdu)
+}
+
+func (d *UDPDirector) handleError(err error) {
+ log.Debug().Err(err).Msg("handleError")
+}
+
+// Indication Client requests are queued for delivery.
+func (d *UDPDirector) Indication(pdu _PDU) error {
+ log.Debug().Msgf("Indication %s", pdu)
+
+ // get the destination
+ addr := pdu.GetPDUDestination()
+
+ // get the peer
+ peer, ok := d.peers[addr.String()]
+ if !ok {
+ peer = d.actorClass(d, (*addr).String())
+ }
+
+ // send the message
+ return peer.Indication(pdu)
+}
+
+// _response Incoming datagrams are routed through an actor.
+func (d *UDPDirector) _response(pdu _PDU) error {
+ log.Debug().Msgf("_response %s", pdu)
+
+ // get the destination
+ addr := pdu.GetPDUDestination()
+
+ // get the peer
+ peer, ok := d.peers[addr.String()]
+ if !ok {
+ peer = d.actorClass(d, (*addr).String())
+ }
+
+ // send the message
+ return peer.Response(pdu)
}