You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/15 15:28:14 UTC
[plc4x] 02/02: refactor(plc4go/bacnet): added more application code for protocol
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 25c5e941636f66d19ae70a0704a67362a24cd51f
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Nov 15 16:28:00 2022 +0100
refactor(plc4go/bacnet): added more application code for protocol
---
plc4go/internal/bacnetip/ApplicationLayer.go | 137 +++---
plc4go/internal/bacnetip/ApplicationModule.go | 219 +++++++++-
plc4go/internal/bacnetip/Device.go | 63 ---
plc4go/internal/bacnetip/Driver.go | 8 +-
plc4go/internal/bacnetip/IOCBModule.go | 589 ++++++++++++++++++++++++++
plc4go/internal/bacnetip/MessageCodec.go | 30 +-
plc4go/internal/bacnetip/local/Device.go | 36 ++
plc4go/internal/bacnetip/service/Device.go | 28 ++
plc4go/internal/bacnetip/service/Object.go | 28 ++
9 files changed, 987 insertions(+), 151 deletions(-)
diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index 04dc50f415..d7a5ce94f3 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -21,6 +21,7 @@ package bacnetip
import (
"bytes"
+ "github.com/apache/plc4x/plc4go/internal/bacnetip/local"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi/utils"
"github.com/pkg/errors"
@@ -31,33 +32,33 @@ import (
type SSMState uint8
const (
- IDLE SSMState = iota
- SEGMENTED_REQUEST
- AWAIT_CONFIRMATION
- AWAIT_RESPONSE
- SEGMENTED_RESPONSE
- SEGMENTED_CONFIRMATION
- COMPLETED
- ABORTED
+ SSMState_IDLE SSMState = iota
+ SSMState_SEGMENTED_REQUEST
+ SSMState_AWAIT_CONFIRMATION
+ SSMState_AWAIT_RESPONSE
+ SSMState_SEGMENTED_RESPONSE
+ SSMState_SEGMENTED_CONFIRMATION
+ SSMState_COMPLETED
+ SSMState_ABORTED
)
func (s SSMState) String() string {
switch s {
- case IDLE:
+ case SSMState_IDLE:
return "IDLE"
- case SEGMENTED_REQUEST:
+ case SSMState_SEGMENTED_REQUEST:
return "SEGMENTED_REQUEST"
- case AWAIT_CONFIRMATION:
+ case SSMState_AWAIT_CONFIRMATION:
return "AWAIT_CONFIRMATION"
- case AWAIT_RESPONSE:
+ case SSMState_AWAIT_RESPONSE:
return "AWAIT_RESPONSE"
- case SEGMENTED_RESPONSE:
+ case SSMState_SEGMENTED_RESPONSE:
return "SEGMENTED_RESPONSE"
- case SEGMENTED_CONFIRMATION:
+ case SSMState_SEGMENTED_CONFIRMATION:
return "SEGMENTED_CONFIRMATION"
- case COMPLETED:
+ case SSMState_COMPLETED:
return "COMPLETED"
- case ABORTED:
+ case SSMState_ABORTED:
return "ABORTED"
default:
return "Unknown"
@@ -76,7 +77,7 @@ type SSMSAPRequirements interface {
_ServiceAccessPoint
_Client
GetDeviceInfoCache() *DeviceInfoCache
- GetLocalDevice() LocalDeviceObject
+ GetLocalDevice() *local.LocalDeviceObject
GetProposedWindowSize() uint8
GetClientTransactions() []*ClientSSM
GetServerTransactions() []*ServerSSM
@@ -126,7 +127,7 @@ func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
ssmSAP: sap,
pduAddress: pduAddress,
deviceInfo: deviceInfo,
- state: IDLE,
+ state: SSMState_IDLE,
numberOfApduRetries: localDevice.NumberOfAPDURetries,
apduTimeout: localDevice.APDUTimeout,
segmentationSupported: localDevice.SegmentationSupported,
@@ -163,7 +164,7 @@ func (s *SSM) restartTimer(millis uint) {
// setState This function is called when the derived class wants to change state
func (s *SSM) setState(newState SSMState, timer *uint) error {
log.Debug().Msgf("setState %s timer=%d", newState, timer)
- if s.state == COMPLETED || s.state == ABORTED {
+ if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
return errors.Errorf("Invalid state transition from %s to %s", s.state, newState)
}
@@ -360,7 +361,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
return errors.Wrap(err, "error during SSM state transition")
}
- if s.state == COMPLETED || s.state == ABORTED {
+ if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
log.Debug().Msg("remove from active transaction")
s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list
if s.deviceInfo == nil {
@@ -458,7 +459,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
// unsegmented
s.sentAllSegments = true
s.retryCount = 0
- if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+ if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
return errors.Wrap(err, "error switching state")
}
} else {
@@ -468,7 +469,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
s.segmentRetryCount = 0
s.initialSequenceNumber = 0
s.actualWindowSize = nil
- if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+ if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
return errors.Wrap(err, "error switching state")
}
}
@@ -497,11 +498,11 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
log.Debug().Msgf("confirmation\n%s", apdu)
switch s.state {
- case SEGMENTED_REQUEST:
+ case SSMState_SEGMENTED_REQUEST:
return s.segmentedRequest(apdu)
- case AWAIT_CONFIRMATION:
+ case SSMState_AWAIT_CONFIRMATION:
return s.awaitConfirmation(apdu)
- case SEGMENTED_CONFIRMATION:
+ case SSMState_SEGMENTED_CONFIRMATION:
return s.segmentedConfirmation(apdu)
default:
return errors.Errorf("Invalid state %s", s.state)
@@ -512,13 +513,13 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
func (s *ClientSSM) processTask() error {
log.Debug().Msg("processTask")
switch s.state {
- case SEGMENTED_REQUEST:
+ case SSMState_SEGMENTED_REQUEST:
return s.segmentedRequestTimeout()
- case AWAIT_CONFIRMATION:
+ case SSMState_AWAIT_CONFIRMATION:
return s.awaitConfirmationTimeout()
- case SEGMENTED_CONFIRMATION:
+ case SSMState_SEGMENTED_CONFIRMATION:
return s.segmentedConfirmationTimeout()
- case COMPLETED, ABORTED:
+ case SSMState_COMPLETED, SSMState_ABORTED:
return nil
default:
return errors.Errorf("Invalid state %s", s.state)
@@ -530,7 +531,7 @@ func (s *ClientSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
log.Debug().Msgf("abort\n%s", reason)
// change the state to aborted
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return nil, errors.Wrap(err, "Error setting state to aborted")
}
@@ -558,7 +559,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
} else if s.sentAllSegments {
log.Debug().Msg("all done sending request")
- if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+ if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
return errors.Wrap(err, "error switching state")
}
} else {
@@ -587,7 +588,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
log.Debug().Err(err).Msg("error sending response")
}
} else {
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
}
@@ -607,7 +608,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
}
} else if !apdu.GetSegmentedMessage() {
// ack is not segmented
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
if err := s.Response(apdu); err != nil {
@@ -624,13 +625,13 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
s.actualWindowSize = &minWindowSize
s.lastSequenceNumber = 0
s.initialSequenceNumber = 0
- if err := s.setState(SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
+ if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
return errors.Wrap(err, "error switching state")
}
}
case readWriteModel.APDUErrorExactly:
log.Debug().Msg("error/reject/abort")
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
if err := s.Response(apdu); err != nil {
@@ -685,7 +686,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
case readWriteModel.APDUAbortExactly:
log.Debug().Msg("Server aborted")
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
if err := s.Response(apdu); err != nil {
@@ -694,7 +695,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
log.Debug().Msg("simple ack, error or reject")
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
if err := s.Response(apdu); err != nil {
@@ -706,7 +707,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
if !apdu.GetSegmentedMessage() {
log.Debug().Msg("unsegmented")
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
if err := s.Response(apdu); err != nil {
@@ -733,7 +734,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
s.actualWindowSize = apdu.GetProposedWindowSize()
s.lastSequenceNumber = 0
s.initialSequenceNumber = 0
- if err := s.setState(SEGMENTED_CONFIRMATION, nil); err != nil {
+ if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
@@ -858,7 +859,7 @@ func (s *ClientSSM) segmentedConfirmation(apdu readWriteModel.APDU) error {
log.Debug().Err(err).Msg("error sending request")
}
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "error switching state")
}
// TODO: this is nonsense... We need to parse the service and the apdu not sure where to get it from now...
@@ -928,7 +929,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
return errors.Wrap(err, "error during SSM state transition")
}
- if s.state == COMPLETED || s.state == ABORTED {
+ if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
log.Debug().Msg("remove from active transaction")
s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list
if s.deviceInfo != nil {
@@ -954,13 +955,13 @@ func (s *ServerSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
// make sure we're getting confirmed requests
switch s.state {
- case IDLE:
+ case SSMState_IDLE:
return s.idle(apdu)
- case SEGMENTED_REQUEST:
+ case SSMState_SEGMENTED_REQUEST:
return s.segmentedRequest(apdu)
- case AWAIT_RESPONSE:
+ case SSMState_AWAIT_RESPONSE:
return s.awaitResponse(apdu)
- case SEGMENTED_RESPONSE:
+ case SSMState_SEGMENTED_RESPONSE:
return s.segmentedResponse(apdu)
default:
return errors.Errorf("invalid state %s", s.state)
@@ -984,7 +985,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
log.Debug().Msgf("confirmation\n%s", apdu)
// check to see we are in the correct state
- if s.state != AWAIT_RESPONSE {
+ if s.state != SSMState_AWAIT_RESPONSE {
log.Debug().Msg("warning: no expecting a response")
}
@@ -993,7 +994,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
case readWriteModel.APDUAbortExactly:
log.Debug().Msg("abort")
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
@@ -1004,7 +1005,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
log.Debug().Msg("simple ack, error or reject")
// transaction completed
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
@@ -1086,7 +1087,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
if err := s.Response(apdu); err != nil {
log.Debug().Err(err).Msg("error sending response")
}
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
} else {
@@ -1097,7 +1098,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
if err := s.Response(segment); err != nil {
log.Debug().Err(err).Msg("error sending response")
}
- if err := s.setState(SEGMENTED_RESPONSE, nil); err != nil {
+ if err := s.setState(SSMState_SEGMENTED_RESPONSE, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
}
@@ -1114,13 +1115,13 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
func (s *ServerSSM) processTask() error {
log.Debug().Msg("processTask")
switch s.state {
- case SEGMENTED_REQUEST:
+ case SSMState_SEGMENTED_REQUEST:
return s.segmentedRequestTimeout()
- case AWAIT_CONFIRMATION:
+ case SSMState_AWAIT_CONFIRMATION:
return s.awaitResponseTimeout()
- case SEGMENTED_CONFIRMATION:
+ case SSMState_SEGMENTED_CONFIRMATION:
return s.segmentedResponseTimeout()
- case COMPLETED, ABORTED:
+ case SSMState_COMPLETED, SSMState_ABORTED:
return nil
default:
return errors.Errorf("Invalid state %s", s.state)
@@ -1132,7 +1133,7 @@ func (s *ServerSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
log.Debug().Msgf("abort\n%s", reason)
// change the state to aborted
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return nil, errors.Wrap(err, "Error setting state to aborted")
}
@@ -1200,7 +1201,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
// unsegmented request
if len(apduConfirmedRequest.GetSegment()) <= 0 {
- if err := s.setState(AWAIT_RESPONSE, nil); err != nil {
+ if err := s.setState(SSMState_AWAIT_RESPONSE, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
return s.Request(apdu)
@@ -1228,7 +1229,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
// initialize the state
s.lastSequenceNumber = 0
s.initialSequenceNumber = 0
- if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+ if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
@@ -1243,7 +1244,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
// some kind of problem
if _, ok := apdu.(readWriteModel.APDUAbortExactly); ok {
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
return s.Response(apdu)
@@ -1312,7 +1313,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
// forward the whole thing to the application
applicationTimeout := s.ssmSAP.GetApplicationTimeout()
- if err := s.setState(AWAIT_RESPONSE, &applicationTimeout); err != nil {
+ if err := s.setState(SSMState_AWAIT_RESPONSE, &applicationTimeout); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
// TODO: here we need to rebuild again yada yada
@@ -1348,7 +1349,7 @@ func (s *ServerSSM) segmentedRequestTimeout() error {
log.Debug().Msg("segmentedRequestTimeout")
// give up
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
return nil
@@ -1364,7 +1365,7 @@ func (s *ServerSSM) awaitResponse(apdu readWriteModel.APDU) error {
log.Debug().Msg("client aborting this request")
// forward to the application
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
if err := s.Request(apdu); err != nil { // send it ot the device
@@ -1410,7 +1411,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
} else if s.sentAllSegments {
// final ack received?
log.Debug().Msg("all done sending response")
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
} else {
@@ -1427,7 +1428,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
}
// some kind of problem
case readWriteModel.APDUAbortExactly:
- if err := s.setState(COMPLETED, nil); err != nil {
+ if err := s.setState(SSMState_COMPLETED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
if err := s.Response(apdu); err != nil { // send it ot the application
@@ -1451,7 +1452,7 @@ func (s *ServerSSM) segmentedResponseTimeout() error {
}
} else {
// five up
- if err := s.setState(ABORTED, nil); err != nil {
+ if err := s.setState(SSMState_ABORTED, nil); err != nil {
return errors.Wrap(err, "Error setting state to aborted")
}
}
@@ -1462,7 +1463,7 @@ type StateMachineAccessPoint struct {
*Client
*ServiceAccessPoint
- localDevice LocalDeviceObject
+ localDevice *local.LocalDeviceObject
deviceInventory *DeviceInfoCache
nextInvokeId uint8
clientTransactions []*ClientSSM
@@ -1478,7 +1479,7 @@ type StateMachineAccessPoint struct {
applicationTimeout uint
}
-func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice *local.LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
s := &StateMachineAccessPoint{
@@ -1795,7 +1796,7 @@ func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
return s.deviceInventory
}
-func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
+func (s *StateMachineAccessPoint) GetLocalDevice() *local.LocalDeviceObject {
return s.localDevice
}
@@ -1860,7 +1861,7 @@ func (a *ApplicationServiceAccessPoint) Indication(apdu readWriteModel.APDU) err
log.Debug().Err(errorFound).Msg("got error")
// TODO: map it to a error... code temporary placeholder
- a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
+ return a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
}
case readWriteModel.APDUUnconfirmedRequestExactly:
//assume no errors found
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 41d0107507..b974e4eeda 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -23,6 +23,8 @@ import (
"bytes"
"encoding/binary"
"fmt"
+ "github.com/apache/plc4x/plc4go/internal/bacnetip/local"
+ "github.com/apache/plc4x/plc4go/internal/bacnetip/service"
readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
@@ -199,30 +201,223 @@ func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
return nil
}
-// TODO: implement
+// TODO: finish
type Application struct {
- ApplicationServiceElement
+ *ApplicationServiceElement
Collector
+
+ objectName map[string]*local.LocalDeviceObject
+ objectIdentifier map[string]*local.LocalDeviceObject
+ localDevice *local.LocalDeviceObject
+ deviceInfoCache *DeviceInfoCache
+ controllers map[string]interface{}
+ helpers map[string]func(apdu readWriteModel.APDU) error
+}
+
+func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
+ log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%v aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
+ a := &Application{}
+ var err error
+ a.ApplicationServiceElement, err = NewApplicationServiceElement(aseID, a)
+ if err != nil {
+ return nil, err
+ }
+
+ // local objects by ID and name
+ a.objectName = map[string]*local.LocalDeviceObject{}
+ a.objectIdentifier = map[string]*local.LocalDeviceObject{}
+
+ // keep track of the local device
+ if localDevice != nil {
+ a.localDevice = localDevice
+
+ // bind the device object to this application
+ localDevice.App = a
+
+ // local objects by ID and name
+ a.objectName[localDevice.ObjectName] = localDevice
+ a.objectName[localDevice.ObjectIdentifier] = localDevice
+ }
+
+ // use the provided cache or make a default one
+ if deviceInfoCache == nil {
+ var newDeviceInfoCache DeviceInfoCache
+ deviceInfoCache = &newDeviceInfoCache
+ }
+ a.deviceInfoCache = deviceInfoCache
+
+ // controllers for managing confirmed requests as a client
+ a.controllers = map[string]interface{}{}
+
+ // now set up the rest of the capabilities
+ a.Collector = Collector{}
+
+ // TODO: no idea how to handle the capabilities
+ return a, nil
+}
+
+func (a *Application) Request(apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("Request\n%s", apdu)
+
+ // double-check the input is the right kind of APDU
+ switch apdu.(type) {
+ case readWriteModel.APDUUnconfirmedRequestExactly, readWriteModel.APDUConfirmedRequestExactly:
+ default:
+ return errors.New("APDU expected")
+ }
+ return a.ApplicationServiceElement.Request(apdu)
}
-// TODO: implement
-type IOController struct {
+func (a *Application) Indication(apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("Indication\n%s", apdu)
+
+ // get a helper function
+ helperName := fmt.Sprintf("do_%T", apdu)
+ helperFn := a.helpers[helperName]
+ log.Debug().Msgf("helperFn: %s == %t", helperName, helperFn != nil)
+
+ // send back a reject for unrecognized services
+ if helperFn == nil {
+ if _, ok := apdu.(readWriteModel.APDUConfirmedRequestExactly); ok {
+ return errors.Errorf("no function %s", helperName)
+ }
+ return nil
+ }
+
+ if err := helperFn(apdu); err != nil {
+ log.Debug().Err(err).Msgf("err result")
+ // TODO: do proper mapping
+ a.Response(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0))
+ }
+
+ return nil
}
-// TODO: implement
+// TODO: finish
type ApplicationIOController struct {
- IOController
- Application
+ *IOController
+ *Application
+ queueByAddress map[string]SieveQueue
}
-func NewApplicationIOController(interface{}, interface{}, interface{}, *int) (*ApplicationIOController, error) {
- return &ApplicationIOController{}, nil
+func NewApplicationIOController(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*ApplicationIOController, error) {
+ a := &ApplicationIOController{
+ // queues for each address
+ queueByAddress: make(map[string]SieveQueue),
+ }
+ var err error
+ a.IOController, err = NewIOController("", a)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating io controller")
+ }
+ a.Application, err = NewApplication(localDevice, localAddress, deviceInfoCache, aseID)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating application")
+ }
+ return a, nil
+}
+
+func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
+ log.Debug().Msgf("ProcessIO %s", iocb)
+
+ // get the destination address from the pdu
+ destinationAddress := iocb.getDestination()
+ log.Debug().Msgf("destinationAddress %s", destinationAddress)
+
+ // look up the queue
+ queue, ok := a.queueByAddress[destinationAddress.String()]
+ if !ok {
+ newQueue, _ := NewSieveQueue(a._AppRequest, destinationAddress)
+ queue = *newQueue
+ a.queueByAddress[destinationAddress.String()] = queue
+ }
+ log.Debug().Msgf("queue %v", queue)
+
+ // ask the queue to process the request
+ return queue.RequestIO(iocb)
+}
+
+func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)
+
+ // look up the queue
+ queue, ok := a.queueByAddress[address.String()]
+ if !ok {
+ log.Debug().Msgf("no queue for %s", address)
+ return nil
+ }
+ log.Debug().Msgf("queue %v", queue)
+
+ // make sure it has an active iocb
+ if queue.activeIOCB == nil {
+ log.Debug().Msgf("no active request for %s", address)
+ return nil
+ }
+
+ // this request is complete
+ switch apdu.(type) {
+ case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
+ queue.CompleteIO(queue.activeIOCB, apdu)
+ case readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly, readWriteModel.APDUAbortExactly:
+ // TODO: extract error
+ queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu))
+ default:
+ return errors.New("unrecognized APDU type")
+ }
+ log.Debug().Msg("controller finished")
+ // if the queue is empty and idle, forget about the controller
+ if len(queue.ioQueue.queue) == 0 && queue.activeIOCB == nil {
+ delete(a.queueByAddress, address.String())
+ }
+ return nil
+}
+
+func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
+ log.Debug().Msgf("_AppRequest\n%s", apdu)
+
+ if err := a.Request(apdu); err != nil {
+ log.Error().Err(err).Msg("Uh oh")
+ return
+ }
+
+ // send it downstream, bypass the guard
+ if err := a.Application.Request(apdu); err != nil {
+ log.Error().Err(err).Msg("Uh oh")
+ return
+ }
+
+ // if this was an unconfirmed request, it's complete, no message
+ if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); ok {
+ // TODO: where to get the destination now again??
+ a._AppComplete(nil, apdu)
+ }
+}
+
+func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("Request\n%s", apdu)
+
+ // if this is not unconfirmed request, tell the application to use the IOCB interface
+ if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); !ok {
+ return errors.New("use IOCB for confirmed requests")
+ }
+
+ // send it downstream
+ return a.Application.Request(apdu)
+}
+
+func (a *ApplicationIOController) Confirmation(apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("Confirmation\n%s", apdu)
+
+ // this is an ack, error, reject or abort
+ // TODO: where to get the destination now again??
+ a._AppComplete(nil, apdu)
+ return nil
}
type BIPSimpleApplication struct {
*ApplicationIOController
- *WhoIsIAmServices
- *ReadWritePropertyServices
+ *service.WhoIsIAmServices
+ *service.ReadWritePropertyServices
localAddress interface{}
asap *ApplicationServiceAccessPoint
smap *StateMachineAccessPoint
@@ -233,7 +428,7 @@ type BIPSimpleApplication struct {
mux *UDPMultiplexer
}
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
b := &BIPSimpleApplication{}
var err error
b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
deleted file mode 100644
index 33c36205e6..0000000000
--- a/plc4go/internal/bacnetip/Device.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-
-type WhoIsIAmServices struct {
-}
-
-func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
- // TODO: implement me
- return nil, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// _LocalDeviceObjectDefault is a device entry with default entries
-var _LocalDeviceObjectDefault = LocalDeviceObject{
- MaximumApduLengthAccepted: &defaultMaxApduLength,
- SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
- MaxSegmentsAccepted: &defaultMaxSegmentsAccepted,
- APDUSegmentTimeout: 5000,
- APDUTimeout: 3000,
- NumberOfAPDURetries: 3,
-}
-
-type LocalDeviceObject struct {
- NumberOfAPDURetries uint
- APDUTimeout uint
- SegmentationSupported readWriteModel.BACnetSegmentation
- APDUSegmentTimeout uint
- MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted
- MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
-}
-
-func NewLocalDeviceObject() *LocalDeviceObject {
- return &LocalDeviceObject{
- NumberOfAPDURetries: _LocalDeviceObjectDefault.NumberOfAPDURetries,
- APDUTimeout: _LocalDeviceObjectDefault.APDUTimeout,
- SegmentationSupported: _LocalDeviceObjectDefault.SegmentationSupported,
- APDUSegmentTimeout: _LocalDeviceObjectDefault.APDUSegmentTimeout,
- MaxSegmentsAccepted: _LocalDeviceObjectDefault.MaxSegmentsAccepted,
- MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
- }
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 330939d8bb..641a81ad37 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -112,18 +112,18 @@ func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event ap
type ApplicationManager struct {
sync.Mutex
- applications map[string]spi.MessageCodec
+ applications map[string]*ApplicationLayerMessageCodec
}
-func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
var localAddress *net.UDPAddr
+ var remoteAddr *net.UDPAddr
{
host := transportUrl.Host
port := transportUrl.Port()
if transportUrl.Port() == "" {
port = options["defaultUdpPort"][0]
}
- var remoteAddr *net.UDPAddr
if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
panic(err)
} else {
@@ -141,7 +141,7 @@ func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Trans
defer a.Unlock()
messageCodec, ok := a.applications[localAddress.String()]
if !ok {
- newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+ newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress, remoteAddr)
if err != nil {
return nil, errors.Wrap(err, "error creating application layer code")
}
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
new file mode 100644
index 0000000000..068ed17818
--- /dev/null
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -0,0 +1,589 @@
+package bacnetip
+
+import (
+ "container/heap"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi/plcerrors"
+ "github.com/apache/plc4x/plc4go/spi/utils"
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
+ "net"
+ "sync"
+ "time"
+)
+
+type IOCBState int
+
+const (
+ IOCBState_IDLE IOCBState = iota
+ IOCBState_PENDING
+ IOCBState_ACTIVE
+ IOCBState_COMPLETED
+ IOCBState_ABORTED
+)
+
+func (i IOCBState) String() string {
+ switch i {
+ case IOCBState_IDLE:
+ return "IDLE"
+ case IOCBState_PENDING:
+ return "PENDING"
+ case IOCBState_ACTIVE:
+ return "ACTIVE"
+ case IOCBState_COMPLETED:
+ return "COMPLETED"
+ case IOCBState_ABORTED:
+ return "ABORTED"
+ default:
+ return "Unknown"
+ }
+}
+
+type IOQControllerStates int
+
+const (
+ IOQControllerStates_CTRL_IDLE IOQControllerStates = iota
+ IOQControllerStates_CTRL_ACTIVE
+ IOQControllerStates_CTRL_WAITING
+)
+
+func (i IOQControllerStates) String() string {
+ switch i {
+ case IOQControllerStates_CTRL_IDLE:
+ return "IDLE"
+ case IOQControllerStates_CTRL_ACTIVE:
+ return "ACTIVE"
+ case IOQControllerStates_CTRL_WAITING:
+ return "WAITING"
+ default:
+ return "Unknown"
+ }
+}
+
+type _IOCB interface {
+ setIOController(ioController _IOController)
+ setIOState(newState IOCBState)
+ getIOState() IOCBState
+ setIOResponse(msg readWriteModel.APDU)
+ Trigger()
+ setIOError(err error)
+ getRequest() readWriteModel.APDU
+ getDestination() net.Addr
+ getPriority() int
+ clearQueue()
+ Abort(err error) error
+}
+
+var _identNext = 1
+var _identLock sync.Mutex
+
+type IOCB struct {
+ ioID int
+ request readWriteModel.APDU
+ destination net.Addr
+ ioState IOCBState
+ ioResponse readWriteModel.APDU
+ ioError error
+ ioController _IOController
+ ioComplete sync.Cond
+ ioCompleteDone bool
+ ioCallback []func()
+ ioQueue []_IOCB
+ ioTimeout *time.Timer
+ ioTimoutCancel chan interface{}
+ priority int
+}
+
+func NewIOCB(request readWriteModel.APDU, destination net.Addr) (*IOCB, error) {
+ // lock the identity sequence number
+ _identLock.Lock()
+
+ // generate a unique identity for this block
+ ioID := _identNext
+ _identNext++
+
+ // release the lock
+ _identLock.Unlock()
+
+ // debugging postponed until ID acquired
+ log.Debug().Msgf("NewIOCB(%d)", ioID)
+
+ return &IOCB{
+ // save the ID
+ ioID: ioID,
+
+ // save the request parameter
+ request: request,
+ destination: destination,
+
+ // start with an idle request
+ ioState: IOCBState_IDLE,
+ }, nil
+}
+
+// AddCallback Pass a function to be called when IO is complete.
+func (i *IOCB) AddCallback(fn func()) {
+ log.Debug().Msgf("AddCallback(%d): %t", i.ioID, fn != nil)
+ // store it
+ i.ioCallback = append(i.ioCallback, fn)
+
+ // already complete?
+ if i.ioCompleteDone {
+ i.Trigger()
+ }
+}
+
+// Wait for the completion event to be set
+func (i *IOCB) Wait() {
+ log.Debug().Msgf("Wait(%d)", i.ioID)
+ i.ioComplete.Wait()
+}
+
+// Trigger Set the completion event and make the callback(s)
+func (i *IOCB) Trigger() {
+ log.Debug().Msgf("Trigger(%d)", i.ioID)
+
+ // if it's queued, remove it from its queue
+ myIndex := -1
+ var meAsInterface _IOCB = i
+ for index, qe := range i.ioQueue {
+ if qe == meAsInterface {
+ myIndex = index
+ }
+ }
+ if myIndex >= 0 {
+ log.Debug().Msg("dequeue")
+ i.ioQueue = append(i.ioQueue[:myIndex], i.ioQueue[myIndex+1:]...)
+ }
+
+ // if there's a timer, cancel it
+ if i.ioTimeout != nil {
+ log.Debug().Msg("cancel timeout")
+ i.ioTimeout.Stop()
+ }
+
+ // set the completion event
+ i.ioComplete.Broadcast()
+ log.Debug().Msg("complete event set")
+
+ // make callback(s)
+ for _, f := range i.ioCallback {
+ f()
+ }
+}
+
+// Complete Called to complete a transaction, usually when ProcessIO has shipped the IOCB off to some other thread or
+// function.
+func (i *IOCB) Complete(apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("Complete(%d)\n%s", i.ioID, apdu)
+
+ if i.ioController != nil {
+ // pass to the controller
+ return i.ioController.CompleteIO(i, apdu)
+ } else {
+ // just fill in the data
+ i.ioState = IOCBState_COMPLETED
+ i.ioResponse = apdu
+ i.Trigger()
+ return nil
+ }
+}
+
+// Abort Called by a client to abort a transaction.
+func (i *IOCB) Abort(err error) error {
+ log.Debug().Err(err).Msgf("Abort(%d)", i.ioID)
+ defer close(i.ioTimoutCancel)
+
+ if i.ioController != nil {
+ // pass to the controller
+ return i.ioController.AbortIO(i, err)
+ } else {
+ // just fill in the data
+ i.ioState = IOCBState_ABORTED
+ i.ioError = err
+ i.Trigger()
+ return nil
+ }
+}
+
+// SetTimeout Called to set a transaction timer.
+func (i *IOCB) SetTimeout(delay time.Duration) {
+ // if one has already been created, cancel it
+ if i.ioTimeout != nil {
+ i.ioTimeout.Reset(delay)
+ } else {
+ now := time.Now()
+ i.ioTimeout = time.NewTimer(delay)
+ i.ioTimoutCancel = make(chan interface{})
+ go func() {
+ select {
+ case timeout := <-i.ioTimeout.C:
+ _ = i.Abort(plcerrors.NewTimeoutError(now.Sub(timeout)))
+ case <-i.ioTimoutCancel:
+ }
+ }()
+ }
+}
+
+func (i *IOCB) setIOController(ioController _IOController) {
+ i.ioController = ioController
+}
+
+func (i *IOCB) setIOState(newState IOCBState) {
+ i.ioState = newState
+}
+
+func (i *IOCB) getIOState() IOCBState {
+ return i.ioState
+}
+
+func (i *IOCB) setIOResponse(msg readWriteModel.APDU) {
+ i.ioResponse = msg
+}
+
+func (i *IOCB) setIOError(err error) {
+ i.ioError = err
+}
+
+func (i *IOCB) getRequest() readWriteModel.APDU {
+ return i.request
+}
+
+func (i *IOCB) getDestination() net.Addr {
+ return i.destination
+}
+
+func (i *IOCB) getPriority() int {
+ return i.priority
+}
+
+func (i *IOCB) clearQueue() {
+ i.ioQueue = nil
+}
+
+// An PriorityItem is something we manage in a priority queue.
+type PriorityItem struct {
+ value _IOCB // The value of the item; arbitrary.
+ priority int // The priority of the item in the queue.
+ // The index is needed by update and is maintained by the heap.Interface methods.
+ index int // The index of the item in the heap.
+}
+
+// A PriorityQueue implements heap.Interface and holds Items.
+type PriorityQueue []*PriorityItem
+
+func (pq PriorityQueue) Len() int { return len(pq) }
+
+func (pq PriorityQueue) Less(i, j int) bool {
+ // We want Pop to give us the highest, not lowest, priority so we use greater than here.
+ return pq[i].priority > pq[j].priority
+}
+
+func (pq PriorityQueue) Swap(i, j int) {
+ pq[i], pq[j] = pq[j], pq[i]
+ pq[i].index = i
+ pq[j].index = j
+}
+
+func (pq *PriorityQueue) Push(x any) {
+ n := len(*pq)
+ item := x.(*PriorityItem)
+ item.index = n
+ *pq = append(*pq, item)
+}
+
+func (pq *PriorityQueue) Pop() any {
+ old := *pq
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil // avoid memory leak
+ item.index = -1 // for safety
+ *pq = old[0 : n-1]
+ return item
+}
+
+// update modifies the priority and value of an Item in the queue.
+func (pq *PriorityQueue) update(item *PriorityItem, value _IOCB, priority int) {
+ item.value = value
+ item.priority = priority
+ heap.Fix(pq, item.index)
+}
+
+type IOQueue struct {
+ notEmpty sync.Cond
+ queue PriorityQueue
+}
+
+func NewIOQueue(name string) *IOQueue {
+ log.Debug().Msgf("NewIOQueue %s", name)
+ return &IOQueue{}
+}
+
+// Put an IOCB to a queue. This is usually called by the function that filters requests and passes them out to the
+// correct processing thread.
+func (i *IOQueue) Put(iocb _IOCB) error {
+ log.Debug().Msgf("Put %s", iocb)
+
+ // requests should be pending before being queued
+ if iocb.getIOState() != IOCBState_PENDING {
+ return errors.New("invalid state transition")
+ }
+
+ // add the request to the end of the list of iocb's at same priority
+ priority := iocb.getPriority()
+
+ heap.Push(&i.queue, PriorityItem{iocb, priority, 0})
+
+ i.notEmpty.Broadcast()
+ return nil
+}
+
+// Get a request from a queue, optionally block until a request is available.
+func (i *IOQueue) Get(block bool, delay *time.Duration) (_IOCB, error) {
+ log.Debug().Msgf("Get block=%t, delay=%s", block, delay)
+
+ // if the queue is empty, and we do not block return None
+ if !block && len(i.queue) == 0 {
+ log.Debug().Msgf("not blocking and empty")
+ return nil, nil
+ }
+
+ // wait for something to be in the queue
+ if len(i.queue) == 0 {
+ if delay != nil {
+ gotSomething := make(chan interface{})
+ go func() {
+ i.notEmpty.Wait()
+ close(gotSomething)
+ }()
+ timeout := time.NewTimer(*delay)
+ defer utils.CleanupTimer(timeout)
+ select {
+ case <-gotSomething:
+ case <-timeout.C:
+ return nil, nil
+ }
+ } else {
+ i.notEmpty.Wait()
+ }
+ }
+
+ if len(i.queue) == 0 {
+ return nil, nil
+ }
+
+ // extract the first element
+ pi := heap.Pop(&i.queue).(PriorityItem)
+ iocb := pi.value
+ iocb.clearQueue()
+
+ // return the request
+ return iocb, nil
+}
+
+// Remove a control block from the queue, called if the request
+// is canceled/aborted
+func (i *IOQueue) Remove(iocb _IOCB) error {
+ for _, item := range i.queue {
+ if iocb == item.value {
+ heap.Remove(&i.queue, item.index)
+
+ if len(i.queue) == 0 {
+ i.notEmpty.Broadcast()
+ }
+ return nil
+ }
+ }
+ return nil
+}
+
+//Abort all the control blocks in the queue
+func (i *IOQueue) Abort(err error) {
+ for _, item := range i.queue {
+ item.value.clearQueue()
+ _ = item.value.Abort(err)
+ }
+
+ //
+ i.queue = nil
+
+ // the queue is now empty, clear the event
+ i.notEmpty.Broadcast()
+}
+
+type _IOController interface {
+ Abort(err error) error
+ ProcessIO(iocb _IOCB) error
+ CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error
+ AbortIO(iocb _IOCB, err error) error
+}
+
+type IOController struct {
+ name string
+ rootStruct _IOController
+}
+
+func NewIOController(name string, rootStruct _IOController) (*IOController, error) {
+ log.Debug().Msgf("NewIOController name=%s", name)
+ return &IOController{
+ // save the name
+ name: name,
+ rootStruct: rootStruct,
+ }, nil
+}
+
+// Abort all requests, no default implementation.
+func (i *IOController) Abort(err error) error {
+ return nil
+}
+
+// RequestIO Called by a client to start processing a request.
+func (i *IOController) RequestIO(iocb _IOCB) error {
+ log.Debug().Msgf("RequestIO\n%s", iocb)
+
+ // bind the iocb to this controller
+ iocb.setIOController(i)
+
+ // hopefully there won't be an error
+ var err error
+
+ // change the state
+ iocb.setIOState(IOCBState_PENDING)
+
+ // let derived class figure out how to process this
+ err = i.rootStruct.ProcessIO(iocb)
+
+ // if there was an error, abort the request
+ if err != nil {
+ return i.rootStruct.AbortIO(iocb, err)
+ }
+ return nil
+}
+
+// ProcessIO Figure out how to respond to this request. This must be provided by the derived class.
+func (i *IOController) ProcessIO(iocb _IOCB) error {
+ return errors.New("IOController must implement process_io()")
+}
+
+// ActiveIO Called by a handler to notify the controller that a request is being processed
+func (i *IOController) ActiveIO(iocb _IOCB) error {
+ log.Debug().Msgf("ActiveIO %s", iocb)
+
+ // requests should be idle or pending before coming active
+ if iocb.getIOState() != IOCBState_IDLE && iocb.getIOState() != IOCBState_PENDING {
+ return errors.Errorf("invalid state transition (currently %d)", iocb.getIOState())
+ }
+
+ // change the state
+ iocb.setIOState(IOCBState_ACTIVE)
+ return nil
+}
+
+// CompleteIO Called by a handler to return data to the client
+func (i *IOController) CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error {
+ log.Debug().Msgf("CompleteIO %s\n%s", iocb, apdu)
+
+ // if it completed, leave it alone
+ if iocb.getIOState() == IOCBState_COMPLETED {
+ return nil
+ }
+
+ // if it already aborted, leave it alone
+ if iocb.getIOState() == IOCBState_ABORTED {
+ return nil
+ }
+
+ // change the state
+ iocb.setIOState(IOCBState_COMPLETED)
+ iocb.setIOResponse(apdu)
+
+ // notify the client
+ iocb.Trigger()
+
+ return nil
+}
+
+// AbortIO Called by a handler or a client to abort a transaction
+func (i *IOController) AbortIO(iocb _IOCB, err error) error {
+ log.Debug().Err(err).Msgf("AbortIO %s", iocb)
+
+ // if it completed, leave it alone
+ if iocb.getIOState() == IOCBState_COMPLETED {
+ return nil
+ }
+
+ // if it already aborted, leave it alone
+ if iocb.getIOState() == IOCBState_ABORTED {
+ return nil
+ }
+
+ // change the state
+ iocb.setIOState(IOCBState_ABORTED)
+ iocb.setIOError(err)
+
+ // notify the client
+ iocb.Trigger()
+
+ return nil
+}
+
+type IOQController struct {
+ *IOController
+ state IOQControllerStates
+ activeIOCB _IOCB
+ ioQueue *IOQueue
+}
+
+func NewIOQController(name string) (*IOQController, error) {
+ i := &IOQController{}
+ var err error
+ i.IOController, err = NewIOController(name, i)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating IO controller")
+ }
+
+ // start idle
+ i.state = IOQControllerStates_CTRL_IDLE
+ log.Debug().Msgf("%s %s %s", time.Now(), name, i.state)
+
+ // no active iocb
+ i.activeIOCB = nil
+
+ // create an IOQueue for iocb's requested when not idle
+ i.ioQueue = NewIOQueue(name + " queue")
+
+ return i, nil
+}
+
+// TODO: implement functions of IOQController
+
+type SieveQueue struct {
+ *IOQController
+ requestFn func(apdu readWriteModel.APDU)
+ address net.Addr
+}
+
+func NewSieveQueue(fn func(apdu readWriteModel.APDU), address net.Addr) (*SieveQueue, error) {
+ s := &SieveQueue{}
+ var err error
+ s.IOQController, err = NewIOQController(address.String())
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating a IOQController")
+ }
+
+ // Save a reference to the request function
+ s.requestFn = fn
+ s.address = address
+ return s, nil
+}
+
+func (s *SieveQueue) ProcessIO(iocb _IOCB) error {
+ log.Debug().Msgf("ProcessIO %s", iocb)
+
+ // this is now an active request
+ s.ActiveIO(iocb)
+
+ // send the request
+ s.requestFn(iocb.getRequest())
+ return nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 670cc00ddc..45b51af713 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -21,6 +21,7 @@ package bacnetip
import (
"context"
+ "github.com/apache/plc4x/plc4go/internal/bacnetip/local"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
@@ -38,17 +39,23 @@ type ApplicationLayerMessageCodec struct {
bipSimpleApplication *BIPSimpleApplication
messageCode *MessageCodec
deviceInfoCache DeviceInfoCache
+
+ localAddress *net.UDPAddr
+ remoteAddress *net.UDPAddr
}
-func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
// Have the transport create a new transport-instance.
transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
if err != nil {
return nil, errors.Wrap(err, "error creating transport instance")
}
_ = transportInstance
- a := &ApplicationLayerMessageCodec{}
- application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+ a := &ApplicationLayerMessageCodec{
+ localAddress: localAddress,
+ remoteAddress: remoteAddress,
+ }
+ application, err := NewBIPSimpleApplication(&local.LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
if err != nil {
return nil, err
}
@@ -81,7 +88,22 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
}
func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
- panic("not yet mapped")
+ iocb, err := NewIOCB(message.(model.APDU), m.remoteAddress)
+ if err != nil {
+ return errors.Wrap(err, "error creating IOCB")
+ }
+ go func() {
+ go m.bipSimpleApplication.RequestIO(iocb)
+ iocb.Wait()
+ if iocb.ioError != nil {
+ // TODO: handle error
+ } else if iocb.ioResponse != nil {
+ // TODO: response?
+ } else {
+ // TODO: what now?
+ }
+ }()
+ return nil
}
func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
diff --git a/plc4go/internal/bacnetip/local/Device.go b/plc4go/internal/bacnetip/local/Device.go
new file mode 100644
index 0000000000..752013a2ff
--- /dev/null
+++ b/plc4go/internal/bacnetip/local/Device.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package local
+
+import (
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+)
+
+type LocalDeviceObject struct {
+ NumberOfAPDURetries uint
+ APDUTimeout uint
+ SegmentationSupported readWriteModel.BACnetSegmentation
+ APDUSegmentTimeout uint
+ MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted
+ MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+ App interface{}
+ ObjectName string
+ ObjectIdentifier string
+}
diff --git a/plc4go/internal/bacnetip/service/Device.go b/plc4go/internal/bacnetip/service/Device.go
new file mode 100644
index 0000000000..ae8e18ff9b
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Device.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type WhoIsIAmServices struct {
+}
+
+func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
+ // TODO: implement me
+ return nil, nil
+}
diff --git a/plc4go/internal/bacnetip/service/Object.go b/plc4go/internal/bacnetip/service/Object.go
new file mode 100644
index 0000000000..946f5d5fa5
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Object.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type ReadWritePropertyServices struct {
+}
+
+func NewReadWritePropertyServices() (*ReadWritePropertyServices, error) {
+ // TODO: implement me
+ return nil, nil
+}