You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/16 15:56:02 UTC
[plc4x] branch develop updated: refactor(plc4go/bacnet): use generic messages for integration layer
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 1e22184399 refactor(plc4go/bacnet): use generic messages for integration layer
1e22184399 is described below
commit 1e22184399e566a1d5663310f4c20d8267780951
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Nov 16 16:55:54 2022 +0100
refactor(plc4go/bacnet): use generic messages for integration layer
---
plc4go/internal/bacnetip/ApplicationLayer.go | 59 +++++----
plc4go/internal/bacnetip/ApplicationModule.go | 69 +++++-----
.../bacnetip/BACnetVirtualLinkLayerService.go | 146 ++++++++++++++++++---
plc4go/internal/bacnetip/CommunicationsModule.go | 76 +++++------
plc4go/internal/bacnetip/Driver.go | 11 +-
plc4go/internal/bacnetip/IOCBModule.go | 52 ++++++--
plc4go/internal/bacnetip/NetworkService.go | 143 +++++++++++++++++++-
...etworkService.go => UDPCommunicationsModule.go} | 38 ++++--
8 files changed, 446 insertions(+), 148 deletions(-)
diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index d7a5ce94f3..ee33b885e8 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -23,6 +23,7 @@ import (
"bytes"
"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
@@ -284,7 +285,7 @@ func (s *SSM) getSegment(index uint8) (segmentAPDU readWriteModel.APDU, moreFoll
// TODO: check that function. looks a bit wonky to just append the payloads like that
// appendSegment This function appends the apdu content to the end of the current APDU being built. The segmentAPDU is
// the context
-func (s *SSM) appendSegment(apdu readWriteModel.APDU) error {
+func (s *SSM) appendSegment(apdu spi.Message) error {
log.Debug().Msgf("appendSegment\n%s", apdu)
switch apdu := apdu.(type) {
case readWriteModel.APDUConfirmedRequestExactly:
@@ -373,7 +374,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
}
// Request This function is called by client transaction functions when it wants to send a message to the device
-func (s *ClientSSM) Request(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) Request(apdu spi.Message) error {
log.Debug().Msgf("request\n%s", apdu)
// TODO: ensure apdu has destination, otherwise
// TODO: we would need a BVLC to send something or not... maybe the todo above is nonsense, as we are in a connection context
@@ -382,7 +383,7 @@ func (s *ClientSSM) Request(apdu readWriteModel.APDU) error {
// Indication This function is called after the device has bound a new transaction and wants to start the process
// rolling
-func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe use another name for that
+func (s *ClientSSM) Indication(apdu spi.Message) error { // TODO: maybe use another name for that
log.Debug().Msgf("indication\n%s", apdu)
// make sure we're getting confirmed requests
var apduConfirmedRequest readWriteModel.APDUConfirmedRequest
@@ -391,7 +392,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
}
// save the request and set the segmentation context
- if err := s.setSegmentationContext(apdu); err != nil {
+ if err := s.setSegmentationContext(apduConfirmedRequest); err != nil {
return errors.Wrap(err, "error setting context")
}
@@ -483,7 +484,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
}
// Response This function is called by client transaction functions when they want to send a message to the application.
-func (s *ClientSSM) Response(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) Response(apdu spi.Message) error {
log.Debug().Msgf("response\n%s", apdu)
// make sure it has a good source and destination
// TODO: check if source == s.pduAddress
@@ -494,7 +495,7 @@ func (s *ClientSSM) Response(apdu readWriteModel.APDU) error {
}
// Confirmation This function is called by the device for all upstream messages related to the transaction.
-func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) Confirmation(apdu spi.Message) error {
log.Debug().Msgf("confirmation\n%s", apdu)
switch s.state {
@@ -542,7 +543,7 @@ func (s *ClientSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
}
// segmentedRequest This function is called when the client is sending a segmented request and receives an apdu
-func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) segmentedRequest(apdu spi.Message) error {
log.Debug().Msgf("segmentedRequest\n%s", apdu)
switch apdu := apdu.(type) {
@@ -679,7 +680,7 @@ func (s *ClientSSM) segmentedRequestTimeout() error {
return nil
}
-func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) awaitConfirmation(apdu spi.Message) error {
log.Debug().Msgf("awaitConfirmation\n%s", apdu)
switch apdu := apdu.(type) {
@@ -794,7 +795,7 @@ func (s *ClientSSM) awaitConfirmationTimeout() error {
return nil
}
-func (s *ClientSSM) segmentedConfirmation(apdu readWriteModel.APDU) error {
+func (s *ClientSSM) segmentedConfirmation(apdu spi.Message) error {
log.Debug().Msgf("segmentedConfirmation\n%s", apdu)
// the only messages we should be getting are complex acks
@@ -941,7 +942,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
}
// Request This function is called by transaction functions to send to the application
-func (s *ServerSSM) Request(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) Request(apdu spi.Message) error {
log.Debug().Msgf("request\n%s", apdu)
// TODO: ensure apdu has destination, otherwise
// TODO: we would need a BVLC to send something or not... maybe the todo above is nonsense, as we are in a connection context
@@ -950,7 +951,7 @@ func (s *ServerSSM) Request(apdu readWriteModel.APDU) error {
// Indication This function is called for each downstream packet related to
// the transaction
-func (s *ServerSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe use another name for that
+func (s *ServerSSM) Indication(apdu spi.Message) error { // TODO: maybe use another name for that
log.Debug().Msgf("indication\n%s", apdu)
// make sure we're getting confirmed requests
@@ -969,7 +970,7 @@ func (s *ServerSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
}
// Response This function is called by client transaction functions when they want to send a message to the application.
-func (s *ServerSSM) Response(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) Response(apdu spi.Message) error {
log.Debug().Msgf("response\n%s", apdu)
// make sure it has a good source and destination
// TODO: check if source == none
@@ -981,7 +982,7 @@ func (s *ServerSSM) Response(apdu readWriteModel.APDU) error {
// Confirmation This function is called when the application has provided a response and needs it to be sent to the
// client.
-func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) Confirmation(apdu spi.Message) error {
log.Debug().Msgf("confirmation\n%s", apdu)
// check to see we are in the correct state
@@ -1143,7 +1144,7 @@ func (s *ServerSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
return abortApdu, nil
}
-func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) idle(apdu spi.Message) error {
log.Debug().Msgf("idle %s", apdu)
// make sure we're getting confirmed requests
@@ -1217,7 +1218,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
}
// save the response and set the segmentation context
- if err := s.setSegmentationContext(apdu); err != nil {
+ if err := s.setSegmentationContext(apduConfirmedRequest); err != nil {
return errors.Wrap(err, "error settings segmentation context")
}
@@ -1239,7 +1240,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
return s.Response(segack)
}
-func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) segmentedRequest(apdu spi.Message) error {
log.Debug().Msgf("segmentedRequest\n%s", apdu)
// some kind of problem
@@ -1355,7 +1356,7 @@ func (s *ServerSSM) segmentedRequestTimeout() error {
return nil
}
-func (s *ServerSSM) awaitResponse(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) awaitResponse(apdu spi.Message) error {
log.Debug().Msgf("awaitResponse\n%s", apdu)
switch apdu.(type) {
@@ -1392,7 +1393,7 @@ func (s *ServerSSM) awaitResponseTimeout() error {
return nil
}
-func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
+func (s *ServerSSM) segmentedResponse(apdu spi.Message) error {
log.Debug().Msgf("segmentedResponse\n%s", apdu)
// client is ready for the next segment
@@ -1705,9 +1706,13 @@ func (s *StateMachineAccessPoint) ConfirmationFromSource(apdu readWriteModel.APD
}
// SapIndication This function is called when the application is requesting a new transaction as a client.
-func (s *StateMachineAccessPoint) SapIndication(apdu readWriteModel.APDU, pduDestination []byte) error {
+func (s *StateMachineAccessPoint) SapIndication(apdu spi.Message) error {
log.Debug().Msgf("sapIndication\n%s", apdu)
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
+
// check device communication control
switch s.dccEnableDisable {
case readWriteModel.BACnetConfirmedServiceRequestDeviceCommunicationControlEnableDisable_ENABLE:
@@ -1717,7 +1722,8 @@ func (s *StateMachineAccessPoint) SapIndication(apdu readWriteModel.APDU, pduDes
return nil
case readWriteModel.BACnetConfirmedServiceRequestDeviceCommunicationControlEnableDisable_DISABLE_INITIATION:
log.Debug().Msg("initiation disabled")
- if apdu.GetApduType() == readWriteModel.ApduType_UNCONFIRMED_REQUEST_PDU && apdu.(readWriteModel.APDUUnconfirmedRequest).GetServiceRequest().GetServiceChoice() == readWriteModel.BACnetUnconfirmedServiceChoice_I_AM {
+ // TODO: this should be quarded
+ if apdu.(readWriteModel.APDU).GetApduType() == readWriteModel.ApduType_UNCONFIRMED_REQUEST_PDU && apdu.(readWriteModel.APDUUnconfirmedRequest).GetServiceRequest().GetServiceChoice() == readWriteModel.BACnetUnconfirmedServiceChoice_I_AM {
log.Debug().Msg("continue with I-Am")
} else {
log.Debug().Msg("not an I-Am")
@@ -1766,8 +1772,11 @@ func (s *StateMachineAccessPoint) SapIndication(apdu readWriteModel.APDU, pduDes
// SapConfirmation This function is called when the application is responding to a request, the apdu may be a simple
// ack, complex ack, error, reject or abort
-func (s *StateMachineAccessPoint) SapConfirmation(apdu readWriteModel.APDU, pduDestination []byte) error {
+func (s *StateMachineAccessPoint) SapConfirmation(apdu spi.Message) error {
log.Debug().Msgf("sapConfirmation\n%s", apdu)
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
switch apdu.(type) {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
// find the client transaction this is acking
@@ -1837,7 +1846,7 @@ func NewApplicationServiceAccessPoint(aseID *int, sapID *int) (*ApplicationServi
}
// TODO: big WIP
-func (a *ApplicationServiceAccessPoint) Indication(apdu readWriteModel.APDU) error {
+func (a *ApplicationServiceAccessPoint) Indication(apdu spi.Message) error {
log.Debug().Msgf("Indication\n%s", apdu)
switch apdu := apdu.(type) {
@@ -1890,7 +1899,7 @@ func (a *ApplicationServiceAccessPoint) Indication(apdu readWriteModel.APDU) err
}
// TODO: big WIP
-func (a *ApplicationServiceAccessPoint) SapIndication(apdu readWriteModel.APDU, pduDestination []byte) error {
+func (a *ApplicationServiceAccessPoint) SapIndication(apdu spi.Message) error {
log.Debug().Msgf("SapIndication\n%s", apdu)
// TODO: check if we need to check apdu here
@@ -1899,7 +1908,7 @@ func (a *ApplicationServiceAccessPoint) SapIndication(apdu readWriteModel.APDU,
}
// TODO: big WIP
-func (a *ApplicationServiceAccessPoint) Confirmation(apdu readWriteModel.APDU) error {
+func (a *ApplicationServiceAccessPoint) Confirmation(apdu spi.Message) error {
log.Debug().Msgf("Confirmation\n%s", apdu)
// TODO: check if we need to check apdu here
@@ -1908,7 +1917,7 @@ func (a *ApplicationServiceAccessPoint) Confirmation(apdu readWriteModel.APDU) e
}
// TODO: big WIP
-func (a *ApplicationServiceAccessPoint) SapConfirmation(apdu readWriteModel.APDU, pduDestination []byte) error {
+func (a *ApplicationServiceAccessPoint) SapConfirmation(apdu spi.Message) error {
log.Debug().Msgf("SapConfirmation\n%s", apdu)
// TODO: check if we need to check apdu here
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index b974e4eeda..7d3bab5dfe 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
"github.com/apache/plc4x/plc4go/internal/bacnetip/service"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"hash/fnv"
@@ -91,14 +92,18 @@ func NewDeviceInfoCache() *DeviceInfoCache {
}
}
+func (d *DeviceInfoCache) String() string {
+ return fmt.Sprintf("%#q", d)
+}
+
// HasDeviceInfo Return true if cache has information about the device.
-func (i *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool {
- _, ok := i.cache[key.HashKey()]
+func (d *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool {
+ _, ok := d.cache[key.HashKey()]
return ok
}
// IAmDeviceInfo Create a device information record based on the contents of an IAmRequest and put it in the cache.
-func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) {
+func (d *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) {
log.Debug().Msgf("IAmDeviceInfo\n%s", iAm)
deviceIdentifier := iAm.GetDeviceIdentifier()
@@ -106,11 +111,11 @@ func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServ
deviceInstance := deviceIdentifier.GetInstanceNumber()
// get the existing cache record if it exists
- deviceInfo, ok := i.cache[DeviceInfoCacheKey{&deviceInstance, nil}.HashKey()]
+ deviceInfo, ok := d.cache[DeviceInfoCacheKey{&deviceInstance, nil}.HashKey()]
// maybe there is a record for this address
if !ok {
- deviceInfo, ok = i.cache[DeviceInfoCacheKey{nil, pduSource}.HashKey()]
+ deviceInfo, ok = d.cache[DeviceInfoCacheKey{nil, pduSource}.HashKey()]
}
// make a new one using the class provided
@@ -130,15 +135,15 @@ func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServ
deviceInfo.VendorId = &vendorId
// tell the cache this is an updated record
- i.UpdateDeviceInfo(deviceInfo)
+ d.UpdateDeviceInfo(deviceInfo)
}
// GetDeviceInfo gets a DeviceInfo from cache
-func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+func (d *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, bool) {
log.Debug().Msgf("GetDeviceInfo %s", key)
// get the info if it's there
- deviceInfo, ok := i.cache[key.HashKey()]
+ deviceInfo, ok := d.cache[key.HashKey()]
log.Debug().Msgf("deviceInfo: %#v", deviceInfo)
return deviceInfo, ok
@@ -147,7 +152,7 @@ func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, boo
// UpdateDeviceInfo The application has updated one or more fields in the device information record and the cache needs
// to be updated to reflect the changes. If this is a cached version of a persistent record then this is the
// opportunity to update the database.
-func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
+func (d *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
log.Debug().Msgf("UpdateDeviceInfo %#v", deviceInfo)
// get the current key
@@ -155,13 +160,13 @@ func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
if cacheKey.Instance != nil && deviceInfo.DeviceIdentifier.GetInstanceNumber() != *cacheKey.Instance {
instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
cacheKey.Instance = &instanceNumber
- delete(i.cache, cacheKey.HashKey())
- i.cache[DeviceInfoCacheKey{Instance: &instanceNumber}.HashKey()] = deviceInfo
+ delete(d.cache, cacheKey.HashKey())
+ d.cache[DeviceInfoCacheKey{Instance: &instanceNumber}.HashKey()] = deviceInfo
}
if bytes.Compare(deviceInfo.Address, cacheKey.PduSource) != 0 {
cacheKey.PduSource = deviceInfo.Address
- delete(i.cache, cacheKey.HashKey())
- i.cache[DeviceInfoCacheKey{PduSource: cacheKey.PduSource}.HashKey()] = deviceInfo
+ delete(d.cache, cacheKey.HashKey())
+ d.cache[DeviceInfoCacheKey{PduSource: cacheKey.PduSource}.HashKey()] = deviceInfo
}
// update the key
@@ -170,25 +175,25 @@ func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
Instance: &instanceNumber,
PduSource: deviceInfo.Address,
}
- i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+ d.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
}
// Acquire Return the known information about the device and mark the record as being used by a segmentation state
// machine.
-func (i *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+func (d *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) {
log.Debug().Msgf("Acquire %#v", key)
- deviceInfo, ok := i.cache[key.HashKey()]
+ deviceInfo, ok := d.cache[key.HashKey()]
if ok {
deviceInfo._refCount++
- i.cache[key.HashKey()] = deviceInfo
+ d.cache[key.HashKey()] = deviceInfo
}
return deviceInfo, ok
}
// Release This function is called by the segmentation state machine when it has finished with the device information.
-func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
+func (d *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
//this information record might be used by more than one SSM
if deviceInfo._refCount == 0 {
@@ -197,7 +202,7 @@ func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
// decrement the reference count
deviceInfo._refCount--
- i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+ d.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
return nil
}
@@ -211,11 +216,11 @@ type Application struct {
localDevice *local.LocalDeviceObject
deviceInfoCache *DeviceInfoCache
controllers map[string]interface{}
- helpers map[string]func(apdu readWriteModel.APDU) error
+ helpers map[string]func(pdu spi.Message) error
}
func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
- log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%v aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
+ log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%s aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
a := &Application{}
var err error
a.ApplicationServiceElement, err = NewApplicationServiceElement(aseID, a)
@@ -256,7 +261,7 @@ func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr,
return a, nil
}
-func (a *Application) Request(apdu readWriteModel.APDU) error {
+func (a *Application) Request(apdu spi.Message) error {
log.Debug().Msgf("Request\n%s", apdu)
// double-check the input is the right kind of APDU
@@ -268,7 +273,7 @@ func (a *Application) Request(apdu readWriteModel.APDU) error {
return a.ApplicationServiceElement.Request(apdu)
}
-func (a *Application) Indication(apdu readWriteModel.APDU) error {
+func (a *Application) Indication(apdu spi.Message) error {
log.Debug().Msgf("Indication\n%s", apdu)
// get a helper function
@@ -337,7 +342,7 @@ func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
return queue.RequestIO(iocb)
}
-func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteModel.APDU) error {
+func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu spi.Message) error {
log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)
// look up the queue
@@ -372,7 +377,7 @@ func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteM
return nil
}
-func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
+func (a *ApplicationIOController) _AppRequest(apdu spi.Message) {
log.Debug().Msgf("_AppRequest\n%s", apdu)
if err := a.Request(apdu); err != nil {
@@ -393,7 +398,7 @@ func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
}
}
-func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
+func (a *ApplicationIOController) Request(apdu spi.Message) error {
log.Debug().Msgf("Request\n%s", apdu)
// if this is not unconfirmed request, tell the application to use the IOCB interface
@@ -405,7 +410,7 @@ func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
return a.Application.Request(apdu)
}
-func (a *ApplicationIOController) Confirmation(apdu readWriteModel.APDU) error {
+func (a *ApplicationIOController) Confirmation(apdu spi.Message) error {
log.Debug().Msgf("Confirmation\n%s", apdu)
// this is an ack, error, reject or abort
@@ -418,7 +423,7 @@ type BIPSimpleApplication struct {
*ApplicationIOController
*service.WhoIsIAmServices
*service.ReadWritePropertyServices
- localAddress interface{}
+ localAddress net.Addr
asap *ApplicationServiceAccessPoint
smap *StateMachineAccessPoint
nsap *NetworkServiceAccessPoint
@@ -454,18 +459,18 @@ func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress
// Note: deviceInfoCache already passed above, so we don't need to do it again here
// a network service access point will be needed
- b.nsap, err = NewNetworkServiceAccessPoint()
+ b.nsap, err = NewNetworkServiceAccessPoint(nil, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "error creating network service access point")
}
// give the NSAP a generic network layer service element
- b.nse, err = NewNetworkServiceElement()
+ b.nse, err = NewNetworkServiceElement(nil)
if err != nil {
return nil, errors.Wrap(err, "error creating new network service element")
}
if err := bind(b.nse, b.nsap); err != nil {
- return nil, errors.New("error binding network stack")
+ return nil, errors.Wrap(err, "error binding network stack")
}
// bind the top layers
@@ -489,7 +494,7 @@ func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress
// bind the bottom layers
if err := bind(b.bip, b.annexj, b.mux.annexJ); err != nil {
- return nil, errors.New("error binding bottom layers")
+ return nil, errors.Wrap(err, "error binding bottom layers")
}
// bind the BIP stack to the network, no network number
diff --git a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
index 959ab1b074..ddd742e689 100644
--- a/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
+++ b/plc4go/internal/bacnetip/BACnetVirtualLinkLayerService.go
@@ -20,27 +20,133 @@
package bacnetip
import (
- readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
+ "net"
)
-type UDPMultiplexer struct {
- annexJ interface{}
+type _MultiplexClient struct {
+ *Client
+ multiplexer *UDPMultiplexer
}
-func (m *UDPMultiplexer) Close() error {
- panic("implement me")
+func _New_MultiplexClient(multiplexer *UDPMultiplexer) (*_MultiplexClient, error) {
+ m := &_MultiplexClient{
+ multiplexer: multiplexer,
+ }
+ var err error
+ m.Client, err = NewClient(nil, m)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating client")
+ }
+ return m, nil
+}
+
+func (m *_MultiplexClient) Confirmation(pdu spi.Message) error {
+ return m.multiplexer.Confirmation(pdu)
+}
+
+type _MultiplexServer struct {
+ *Server
+ multiplexer *UDPMultiplexer
+}
+
+func _New_MultiplexServer(multiplexer *UDPMultiplexer) (*_MultiplexServer, error) {
+ m := &_MultiplexServer{
+ multiplexer: multiplexer,
+ }
+ var err error
+ m.Server, err = NewServer(nil, m)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating server")
+ }
+ return m, nil
+}
+
+func (m *_MultiplexServer) Indication(pdu spi.Message) error {
+ return m.multiplexer.Indication(pdu)
+}
+
+type UDPMultiplexer struct {
+ address *net.UDPAddr
+ selfBroadcastAddress *net.UDPAddr
+ direct *_MultiplexClient
+ directPort *UDPDirector
+ broadcast *_MultiplexClient
+ broadcastPort *UDPDirector
+ annexH *_MultiplexServer
+ annexJ *_MultiplexServer
}
-func NewUDPMultiplexer(address interface{}, noBroadcast bool) (*UDPMultiplexer, error) {
+func NewUDPMultiplexer(address net.Addr, noBroadcast bool) (*UDPMultiplexer, error) {
log.Debug().Msgf("NewUDPMultiplexer %v noBroadcast=%t", address, noBroadcast)
u := &UDPMultiplexer{}
- // TODO: plumb later
+ // check for some options
+ specialBroadcast := false
+ if address == nil {
+ u.address = &net.UDPAddr{IP: nil, Port: 46808}
+ u.selfBroadcastAddress = &net.UDPAddr{IP: net.IPv4(255, 255, 255, 255), Port: 47808}
+ } else {
+ udpAddr, err := net.ResolveUDPAddr("udp", address.String())
+ if err != nil {
+ return nil, errors.Wrap(err, "error resolving upd")
+ }
+ u.address = udpAddr
+ // TODO: we need to find a way to resolved broadcast
+ u.selfBroadcastAddress = &net.UDPAddr{IP: net.IPv4(255, 255, 255, 255), Port: 47808}
+ }
+
+ log.Debug().Msgf("address %s, broadcast %s", u.address, u.selfBroadcastAddress)
+
+ // create and bind direct address
+ var err error
+ u.direct, err = _New_MultiplexClient(u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating multiplex client")
+ }
+ u.directPort, err = NewUDPDirector(u.address, nil, nil, nil, nil)
+ if err := bind(u.direct, u.directPort); err != nil {
+ return nil, errors.Wrap(err, "error binding ports")
+ }
+
+ // create and bind the broadcast address for non-Windows
+ if specialBroadcast && !noBroadcast {
+ u.broadcast, err = _New_MultiplexClient(u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating broadcast multiplex client")
+ }
+ u.broadcastPort, err = NewUDPDirector(u.selfBroadcastAddress, nil, nil, nil, nil)
+ if err := bind(u.direct, u.directPort); err != nil {
+ return nil, errors.Wrap(err, "error binding ports")
+ }
+ }
+
+ // create and bind the Annex H and J servers
+ u.annexH, err = _New_MultiplexServer(u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating annexH")
+ }
+ u.annexJ, err = _New_MultiplexServer(u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating annexJ")
+ }
return u, nil
}
+func (m *UDPMultiplexer) Close() error {
+ panic("implement me")
+}
+
+func (m *UDPMultiplexer) Confirmation(pdu spi.Message) error {
+ panic("implement me")
+}
+
+func (m *UDPMultiplexer) Indication(pdu spi.Message) error {
+ panic("implement me")
+}
+
type AnnexJCodec struct {
*Client
*Server
@@ -62,11 +168,11 @@ func NewAnnexJCodec(cid *int, sid *int) (*AnnexJCodec, error) {
return a, nil
}
-func (b *AnnexJCodec) Indication(apdu readWriteModel.APDU) error {
+func (b *AnnexJCodec) Indication(apdu spi.Message) error {
panic("we need to implement this with generics as we handle npdu not apdu here")
}
-func (b *AnnexJCodec) Confirmation(apdu readWriteModel.APDU) error {
+func (b *AnnexJCodec) Confirmation(apdu spi.Message) error {
panic("we need to implement this with generics as we handle npdu not apdu here")
}
@@ -92,18 +198,24 @@ func NewBIPSAP(sapID *int, rootStruct _BIPSAP) (*BIPSAP, error) {
return b, nil
}
-func (b *BIPSAP) SapIndication(apdu readWriteModel.APDU, pduDestination []byte) error {
- log.Debug().Msgf("SapIndication\n%s\n%s", apdu, pduDestination)
+func (b *BIPSAP) SapIndication(pdu spi.Message) error {
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
+ log.Debug().Msgf("SapIndication\n%s\n%s", pdu, pduDestination)
// TODO: what to do with the destination?
// this is a request initiated by the ASE, send this downstream
- return b.rootStruct.Request(apdu)
+ return b.rootStruct.Request(pdu)
}
-func (b *BIPSAP) SapConfirmation(apdu readWriteModel.APDU, pduDestination []byte) error {
- log.Debug().Msgf("SapConfirmation\n%s\n%s", apdu, pduDestination)
+func (b *BIPSAP) SapConfirmation(pdu spi.Message) error {
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
+ log.Debug().Msgf("SapConfirmation\n%s\n%s", pdu, pduDestination)
// TODO: what to do with the destination?
// this is a response from the ASE, send this downstream
- return b.rootStruct.Request(apdu)
+ return b.rootStruct.Request(pdu)
}
type BIPSimple struct {
@@ -133,10 +245,10 @@ func NewBIPSimple(sapID *int, cid *int, sid *int) (*BIPSimple, error) {
return b, nil
}
-func (b *BIPSimple) Indication(apdu readWriteModel.APDU) error {
+func (b *BIPSimple) Indication(apdu spi.Message) error {
panic("we need to implement this with generics as we handle npdu not apdu here")
}
-func (b *BIPSimple) Response(apdu readWriteModel.APDU) error {
+func (b *BIPSimple) Response(apdu spi.Message) error {
panic("we need to implement this with generics as we handle npdu not apdu here")
}
diff --git a/plc4go/internal/bacnetip/CommunicationsModule.go b/plc4go/internal/bacnetip/CommunicationsModule.go
index f92cf24a8e..46b34c9aa1 100644
--- a/plc4go/internal/bacnetip/CommunicationsModule.go
+++ b/plc4go/internal/bacnetip/CommunicationsModule.go
@@ -20,7 +20,7 @@
package bacnetip
import (
- readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
)
@@ -42,8 +42,8 @@ func init() {
// _Client is an interface used for documentation
type _Client interface {
- Request(apdu readWriteModel.APDU) error
- Confirmation(apdu readWriteModel.APDU) error
+ Request(pdu spi.Message) error
+ Confirmation(pdu spi.Message) error
_setClientPeer(server _Server)
}
@@ -78,16 +78,16 @@ func NewClient(cid *int, rootStruct _Client) (*Client, error) {
return c, nil
}
-func (c *Client) Request(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("request\n%s", apdu)
+func (c *Client) Request(pdu spi.Message) error {
+ log.Debug().Msgf("request\n%s", pdu)
if c.clientPeer == nil {
return errors.New("unbound client")
}
- return c.clientPeer.Indication(apdu)
+ return c.clientPeer.Indication(pdu)
}
-func (c *Client) Confirmation(readWriteModel.APDU) error {
+func (c *Client) Confirmation(spi.Message) error {
panic("this should be implemented by outer struct")
}
@@ -97,8 +97,8 @@ func (c *Client) _setClientPeer(server _Server) {
// _Server is an interface used for documentation
type _Server interface {
- Indication(apdu readWriteModel.APDU) error
- Response(apdu readWriteModel.APDU) error
+ Indication(pdu spi.Message) error
+ Response(pdu spi.Message) error
_setServerPeer(serverPeer _Client)
}
@@ -133,17 +133,17 @@ func NewServer(sid *int, rootStruct _Server) (*Server, error) {
return s, nil
}
-func (s *Server) Indication(readWriteModel.APDU) error {
+func (s *Server) Indication(spi.Message) error {
panic("this should be implemented by outer struct")
}
-func (s *Server) Response(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("response\n%s", apdu)
+func (s *Server) Response(pdu spi.Message) error {
+ log.Debug().Msgf("response\n%s", pdu)
if s.serverPeer == nil {
return errors.New("unbound server")
}
- return s.serverPeer.Confirmation(apdu)
+ return s.serverPeer.Confirmation(pdu)
}
func (s *Server) _setServerPeer(serverPeer _Client) {
@@ -152,10 +152,10 @@ func (s *Server) _setServerPeer(serverPeer _Client) {
// _ServiceAccessPoint is a interface used for documentation
type _ServiceAccessPoint interface {
- SapConfirmation(apdu readWriteModel.APDU, pduDestination []byte) error
- SapRequest(apdu readWriteModel.APDU) error
- SapIndication(apdu readWriteModel.APDU, pduDestination []byte) error
- SapResponse(apdu readWriteModel.APDU) error
+ SapConfirmation(pdu spi.Message) error
+ SapRequest(pdu spi.Message) error
+ SapIndication(pdu spi.Message) error
+ SapResponse(pdu spi.Message) error
_setServiceElement(serviceElement _ApplicationServiceElement)
}
@@ -189,29 +189,29 @@ func NewServiceAccessPoint(sapID *int, rootStruct _ServiceAccessPoint) (*Service
return s, nil
}
-func (s *ServiceAccessPoint) SapRequest(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("SapRequest(%d)\n%s", s.serviceID, apdu)
+func (s *ServiceAccessPoint) SapRequest(pdu spi.Message) error {
+ log.Debug().Msgf("SapRequest(%d)\n%s", s.serviceID, pdu)
if s.serviceElement == nil {
return errors.New("unbound service access point")
}
- return s.serviceElement.Indication(apdu)
+ return s.serviceElement.Indication(pdu)
}
-func (s *ServiceAccessPoint) SapIndication(readWriteModel.APDU, []byte) error {
+func (s *ServiceAccessPoint) SapIndication(spi.Message) error {
panic("this should be implemented by outer struct")
}
-func (s *ServiceAccessPoint) SapResponse(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("SapResponse(%d)\n%s", s.serviceID, apdu)
+func (s *ServiceAccessPoint) SapResponse(pdu spi.Message) error {
+ log.Debug().Msgf("SapResponse(%d)\n%s", s.serviceID, pdu)
if s.serviceElement == nil {
return errors.New("unbound service access point")
}
- return s.serviceElement.Confirmation(apdu)
+ return s.serviceElement.Confirmation(pdu)
}
-func (s *ServiceAccessPoint) SapConfirmation(readWriteModel.APDU, []byte) error {
+func (s *ServiceAccessPoint) SapConfirmation(spi.Message) error {
panic("this should be implemented by outer struct")
}
@@ -221,10 +221,10 @@ func (s *ServiceAccessPoint) _setServiceElement(serviceElement _ApplicationServi
// _ApplicationServiceElement is a interface used for documentation
type _ApplicationServiceElement interface {
- Request(apdu readWriteModel.APDU) error
- Indication(apdu readWriteModel.APDU) error
- Response(apdu readWriteModel.APDU) error
- Confirmation(apdu readWriteModel.APDU) error
+ Request(pdu spi.Message) error
+ Indication(pdu spi.Message) error
+ Response(pdu spi.Message) error
+ Confirmation(pdu spi.Message) error
_setElementService(elementService _ServiceAccessPoint)
}
@@ -259,31 +259,31 @@ func NewApplicationServiceElement(aseID *int, rootStruct _ApplicationServiceElem
return a, nil
}
-func (a *ApplicationServiceElement) Request(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("Request\n%s", apdu)
+func (a *ApplicationServiceElement) Request(pdu spi.Message) error {
+ log.Debug().Msgf("Request\n%s", pdu)
if a.elementService == nil {
return errors.New("unbound application service element")
}
- return a.elementService.SapIndication(apdu, nil) // TODO: where to get the source from
+ return a.elementService.SapIndication(pdu)
}
-func (a *ApplicationServiceElement) Indication(apdu readWriteModel.APDU) error {
+func (a *ApplicationServiceElement) Indication(spi.Message) error {
panic("this should be implemented by outer struct")
}
-func (a *ApplicationServiceElement) Response(apdu readWriteModel.APDU) error {
- log.Debug().Msgf("Response\n%s", apdu)
+func (a *ApplicationServiceElement) Response(pdu spi.Message) error {
+ log.Debug().Msgf("Response\n%s", pdu)
if a.elementService == nil {
return errors.New("unbound application service element")
}
- return a.elementService.SapConfirmation(apdu, nil) // TODO: where to get the source from
+ return a.elementService.SapConfirmation(pdu)
}
-func (a *ApplicationServiceElement) Confirmation(apdu readWriteModel.APDU) error {
+func (a *ApplicationServiceElement) Confirmation(spi.Message) error {
panic("this should be implemented by outer struct")
}
@@ -365,7 +365,7 @@ func bind(args ...interface{}) error {
}
// go through the argument pairs
- for i := 0; i < len(args); i++ {
+ for i := 0; i < len(args)-1; i++ {
client := args[i]
log.Debug().Msgf("client %v", client)
server := args[i+1]
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 641a81ad37..dfccafe687 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -89,7 +89,14 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
return ch
}
- codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options)
+ codec, err := m.applicationManager.getApplicationLayerMessageCodec(udpTransport, transportUrl, options)
+ if err != nil {
+ ch := make(chan plc4go.PlcConnectionConnectResult)
+ go func() {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "error getting application layer message codec"))
+ }()
+ return ch
+ }
log.Debug().Msgf("working with codec %#v", codec)
// Create the new connection
@@ -115,7 +122,7 @@ type ApplicationManager struct {
applications map[string]*ApplicationLayerMessageCodec
}
-func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
+func (a *ApplicationManager) getApplicationLayerMessageCodec(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
var localAddress *net.UDPAddr
var remoteAddr *net.UDPAddr
{
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
index 068ed17818..e51fa8456c 100644
--- a/plc4go/internal/bacnetip/IOCBModule.go
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -1,8 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package bacnetip
import (
"container/heap"
- readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "fmt"
+ "github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/plcerrors"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
@@ -64,10 +84,10 @@ type _IOCB interface {
setIOController(ioController _IOController)
setIOState(newState IOCBState)
getIOState() IOCBState
- setIOResponse(msg readWriteModel.APDU)
+ setIOResponse(msg spi.Message)
Trigger()
setIOError(err error)
- getRequest() readWriteModel.APDU
+ getRequest() spi.Message
getDestination() net.Addr
getPriority() int
clearQueue()
@@ -79,10 +99,10 @@ var _identLock sync.Mutex
type IOCB struct {
ioID int
- request readWriteModel.APDU
+ request spi.Message
destination net.Addr
ioState IOCBState
- ioResponse readWriteModel.APDU
+ ioResponse spi.Message
ioError error
ioController _IOController
ioComplete sync.Cond
@@ -94,7 +114,7 @@ type IOCB struct {
priority int
}
-func NewIOCB(request readWriteModel.APDU, destination net.Addr) (*IOCB, error) {
+func NewIOCB(request spi.Message, destination net.Addr) (*IOCB, error) {
// lock the identity sequence number
_identLock.Lock()
@@ -174,7 +194,7 @@ func (i *IOCB) Trigger() {
// Complete Called to complete a transaction, usually when ProcessIO has shipped the IOCB off to some other thread or
// function.
-func (i *IOCB) Complete(apdu readWriteModel.APDU) error {
+func (i *IOCB) Complete(apdu spi.Message) error {
log.Debug().Msgf("Complete(%d)\n%s", i.ioID, apdu)
if i.ioController != nil {
@@ -237,7 +257,7 @@ func (i *IOCB) getIOState() IOCBState {
return i.ioState
}
-func (i *IOCB) setIOResponse(msg readWriteModel.APDU) {
+func (i *IOCB) setIOResponse(msg spi.Message) {
i.ioResponse = msg
}
@@ -245,7 +265,7 @@ func (i *IOCB) setIOError(err error) {
i.ioError = err
}
-func (i *IOCB) getRequest() readWriteModel.APDU {
+func (i *IOCB) getRequest() spi.Message {
return i.request
}
@@ -414,7 +434,7 @@ func (i *IOQueue) Abort(err error) {
type _IOController interface {
Abort(err error) error
ProcessIO(iocb _IOCB) error
- CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error
+ CompleteIO(iocb _IOCB, pdu spi.Message) error
AbortIO(iocb _IOCB, err error) error
}
@@ -461,7 +481,7 @@ func (i *IOController) RequestIO(iocb _IOCB) error {
}
// ProcessIO Figure out how to respond to this request. This must be provided by the derived class.
-func (i *IOController) ProcessIO(iocb _IOCB) error {
+func (i *IOController) ProcessIO(_IOCB) error {
return errors.New("IOController must implement process_io()")
}
@@ -480,7 +500,7 @@ func (i *IOController) ActiveIO(iocb _IOCB) error {
}
// CompleteIO Called by a handler to return data to the client
-func (i *IOController) CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error {
+func (i *IOController) CompleteIO(iocb _IOCB, apdu spi.Message) error {
log.Debug().Msgf("CompleteIO %s\n%s", iocb, apdu)
// if it completed, leave it alone
@@ -559,11 +579,11 @@ func NewIOQController(name string) (*IOQController, error) {
type SieveQueue struct {
*IOQController
- requestFn func(apdu readWriteModel.APDU)
+ requestFn func(apdu spi.Message)
address net.Addr
}
-func NewSieveQueue(fn func(apdu readWriteModel.APDU), address net.Addr) (*SieveQueue, error) {
+func NewSieveQueue(fn func(apdu spi.Message), address net.Addr) (*SieveQueue, error) {
s := &SieveQueue{}
var err error
s.IOQController, err = NewIOQController(address.String())
@@ -587,3 +607,7 @@ func (s *SieveQueue) ProcessIO(iocb _IOCB) error {
s.requestFn(iocb.getRequest())
return nil
}
+
+func (s *SieveQueue) String() string {
+ return fmt.Sprintf("%#q", s)
+}
diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/NetworkService.go
index a92045d037..ce196f858d 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/NetworkService.go
@@ -19,24 +19,153 @@
package bacnetip
+import (
+ "github.com/apache/plc4x/plc4go/spi"
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
+ "net"
+)
+
+// TODO: implement me
+type NetworkAdapter struct {
+ *Client
+ adapterSAP *NetworkServiceAccessPoint
+ adapterNet interface{}
+ adapterAddr net.Addr
+ adapterNetConfigured *int
+}
+
+func NewNetworkAdapter(sap *NetworkServiceAccessPoint, net interface{}, addr net.Addr, cid *int) (*NetworkAdapter, error) {
+ n := &NetworkAdapter{
+ adapterSAP: sap,
+ adapterNet: net,
+ adapterAddr: addr,
+ }
+ var err error
+ n.Client, err = NewClient(cid, n)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating client")
+ }
+ // record if this was 0=learned, 1=configured, None=unknown
+ if net != nil {
+ var state = 1
+ n.adapterNetConfigured = &state
+ }
+ return n, nil
+}
+
+// Confirmation Decode upstream PDUs and pass them up to the service access point.
+func (n *NetworkAdapter) Confirmation(npdu spi.Message) error {
+ log.Debug().Msgf("confirmation\n%s\n%s", npdu, n.adapterNet)
+
+ // TODO: we need generics otherwise this won't work at all here
+ return n.adapterSAP.ProcessNPDU(npdu)
+}
+
+// ProcessNPDU Encode NPDUs from the service access point and send them downstream.
+func (n *NetworkAdapter) ProcessNPDU(npdu spi.Message) error {
+ log.Debug().Msgf("ProcessNPDU\n%s\n(net=%s)", npdu, n.adapterNet)
+ return n.Request(npdu)
+}
+
type NetworkServiceAccessPoint struct {
- // TODO: implement me
+ *ServiceAccessPoint
+ *Server
+ adapters map[string]*NetworkAdapter
+ routerInfoCache interface{}
+ pendingNets map[string]interface{}
+ localAdapter interface{}
}
-func NewNetworkServiceAccessPoint() (*NetworkServiceAccessPoint, error) {
- // TODO: implement me
- return nil, nil
+func NewNetworkServiceAccessPoint(routerInfoCache interface{}, sapID *int, sid *int) (*NetworkServiceAccessPoint, error) {
+ n := &NetworkServiceAccessPoint{}
+ var err error
+ n.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, n)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating network service access point")
+ }
+ n.Server, err = NewServer(sid, n)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating server")
+ }
+
+ // map of directly connected networks
+ n.adapters = make(map[string]*NetworkAdapter)
+
+ // use the provided cache or make a default one
+ if routerInfoCache == nil {
+ // TODO: create a new cache
+ }
+ n.routerInfoCache = routerInfoCache
+
+ // map to a list of application layer packets waiting for a path
+ n.pendingNets = make(map[string]interface{})
+
+ return n, nil
}
func (n *NetworkServiceAccessPoint) bind(server _Server, net interface{}, address interface{}) error {
panic("not implemented yet")
}
+func (n *NetworkServiceAccessPoint) UpdateRouterReference() error {
+ panic("not implemented yet")
+}
+
+func (n *NetworkServiceAccessPoint) DeleteRouterReference() error {
+ panic("not implemented yet")
+}
+
+func (n *NetworkServiceAccessPoint) Indication(npdu spi.Message) error {
+ panic("not implemented yet")
+}
+
+func (n *NetworkServiceAccessPoint) ProcessNPDU(npdu spi.Message) error {
+ panic("not implemented yet")
+}
+
+func (n *NetworkServiceAccessPoint) SapIndication(npdu spi.Message) error {
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
+ _ = pduDestination
+ panic("not implemented yet")
+}
+
+func (n *NetworkServiceAccessPoint) SapConfirmation(npdu spi.Message) error {
+ // TODO: extract from somewhere
+ var pduDestination []byte
+ panic("we need pduDestination")
+ _ = pduDestination
+ panic("not implemented yet")
+}
+
type NetworkServiceElement struct {
+ *ApplicationServiceElement
+
// TODO: implement me
}
-func NewNetworkServiceElement() (*NetworkServiceElement, error) {
- // TODO: implement me
- return nil, nil
+func NewNetworkServiceElement(eid *int) (*NetworkServiceElement, error) {
+ n := &NetworkServiceElement{}
+ var err error
+ n.ApplicationServiceElement, err = NewApplicationServiceElement(eid, n)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating application service element")
+ }
+
+ // TODO: we need to use defer from package as this go routine is too early
+ go n.Startup()
+ return n, nil
+}
+
+func (n *NetworkServiceElement) Startup() {
+ log.Debug().Msg("Startup")
+
+ // reference the service access point
+ sap := n.elementService.(*NetworkServiceAccessPoint) // TODO: hard cast but seems like adapters apears first in network service access point (so hard binding)
+ log.Debug().Msgf("sap: %v", sap)
+
+ // loop through all the adapters
+ // TODO: no adapters yet
}
diff --git a/plc4go/internal/bacnetip/NetworkService.go b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
similarity index 57%
copy from plc4go/internal/bacnetip/NetworkService.go
copy to plc4go/internal/bacnetip/UDPCommunicationsModule.go
index a92045d037..aeed7863c6 100644
--- a/plc4go/internal/bacnetip/NetworkService.go
+++ b/plc4go/internal/bacnetip/UDPCommunicationsModule.go
@@ -19,24 +19,36 @@
package bacnetip
-type NetworkServiceAccessPoint struct {
- // TODO: implement me
-}
+import (
+ "github.com/pkg/errors"
+ "net"
+)
-func NewNetworkServiceAccessPoint() (*NetworkServiceAccessPoint, error) {
- // TODO: implement me
- return nil, nil
+type UDPActor struct {
}
-func (n *NetworkServiceAccessPoint) bind(server _Server, net interface{}, address interface{}) error {
- panic("not implemented yet")
+type UDPPickleActor struct {
+ *UDPActor
}
-type NetworkServiceElement struct {
- // TODO: implement me
+// TODO: finish me
+type UDPDirector struct {
+ *Server
+ *ServiceAccessPoint
}
-func NewNetworkServiceElement() (*NetworkServiceElement, error) {
- // TODO: implement me
- return nil, nil
+func NewUDPDirector(address net.Addr, timeout *int, reuse *bool, sid *int, sapID *int) (*UDPDirector, error) {
+ u := &UDPDirector{}
+ var err error
+ u.Server, err = NewServer(sid, u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating server")
+ }
+ u.ServiceAccessPoint, err = NewServiceAccessPoint(sapID, u)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating service access point")
+ }
+
+ // TODO: finish this
+ return u, nil
}