You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/14 15:42:07 UTC
[plc4x] branch develop updated: feat(plc4go/bacnet): use upstream device info cache
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 83606befb0 feat(plc4go/bacnet): use upstream device info cache
83606befb0 is described below
commit 83606befb06f9012aa68339dd328d068a83b9a54
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Nov 14 14:16:31 2022 +0100
feat(plc4go/bacnet): use upstream device info cache
---
plc4go/internal/bacnetip/ApplicationLayer.go | 71 ++++++-----
plc4go/internal/bacnetip/ApplicationModule.go | 176 +++++++++++++++++++++++++-
plc4go/internal/bacnetip/Device.go | 22 ++++
plc4go/internal/bacnetip/DeviceInventory.go | 1 +
4 files changed, 235 insertions(+), 35 deletions(-)
diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index c2a1324adc..04dc50f415 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -75,8 +75,8 @@ type segmentAPDU struct {
type SSMSAPRequirements interface {
_ServiceAccessPoint
_Client
- GetDeviceInventory() *DeviceInventory
- GetLocalDevice() DeviceEntry
+ GetDeviceInfoCache() *DeviceInfoCache
+ GetLocalDevice() LocalDeviceObject
GetProposedWindowSize() uint8
GetClientTransactions() []*ClientSSM
GetServerTransactions() []*ServerSSM
@@ -89,8 +89,8 @@ type SSM struct {
ssmSAP SSMSAPRequirements
- pduAddress []byte
- deviceEntry *DeviceEntry
+ pduAddress []byte
+ deviceInfo *DeviceInfo
invokeId uint8
@@ -116,15 +116,16 @@ type SSM struct {
func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
log.Debug().Interface("sap", sap).Bytes("pdu_address", pduAddress).Msg("init")
- deviceEntry, err := sap.GetDeviceInventory().getEntryForDestination(pduAddress)
- if err != nil {
- return SSM{}, errors.Wrap(err, "Can't create SSM")
+ var deviceInfo *DeviceInfo
+ deviceInfoTemp, ok := sap.GetDeviceInfoCache().GetDeviceInfo(DeviceInfoCacheKey{PduSource: pduAddress})
+ if ok {
+ deviceInfo = &deviceInfoTemp
}
localDevice := sap.GetLocalDevice()
return SSM{
ssmSAP: sap,
pduAddress: pduAddress,
- deviceEntry: deviceEntry,
+ deviceInfo: deviceInfo,
state: IDLE,
numberOfApduRetries: localDevice.NumberOfAPDURetries,
apduTimeout: localDevice.APDUTimeout,
@@ -342,7 +343,7 @@ func NewClientSSM(sap SSMSAPRequirements, pduAddress []byte) (*ClientSSM, error)
return nil, err
}
// TODO: if deviceEntry is not there get it now...
- if ssm.deviceEntry == nil {
+ if ssm.deviceInfo == nil {
// TODO: get entry for device, store it in inventory
log.Debug().Msg("Accquire device information")
}
@@ -362,7 +363,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
if s.state == COMPLETED || s.state == ABORTED {
log.Debug().Msg("remove from active transaction")
s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list
- if s.deviceEntry == nil {
+ if s.deviceInfo == nil {
// TODO: release device entry
log.Debug().Msg("release device entry")
}
@@ -395,13 +396,13 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
// if the max apdu length of the server isn't known, assume that it is the same size as our own and will be the segment
// size
- if s.deviceEntry == nil || s.deviceEntry.MaximumApduLengthAccepted != nil {
+ if s.deviceInfo == nil || s.deviceInfo.MaximumApduLengthAccepted != nil {
s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
- } else if s.deviceEntry.MaximumNpduLength == nil {
+ } else if s.deviceInfo.MaximumNpduLength == nil {
// if the max npdu length of the server isn't known, assume that it is the same as the max apdu length accepted
s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
} else {
- s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
+ s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
}
log.Debug().Msgf("segment size %d", s.segmentSize)
@@ -426,9 +427,9 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
return s.Response(abort)
}
- if s.deviceEntry == nil {
+ if s.deviceInfo == nil {
log.Debug().Msg("no server info for segmentation support")
- } else if s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
+ } else if *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
log.Debug().Msg("server can't receive segmented requests")
abort, err := s.abort(readWriteModel.BACnetAbortReason_SEGMENTATION_NOT_SUPPORTED)
if err != nil {
@@ -438,11 +439,11 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
}
// make sure we don't exceed the number of segments in our request that the server said it was willing to accept
- if s.deviceEntry == nil {
+ if s.deviceInfo == nil {
log.Debug().Msg("no server info for maximum number of segments")
- } else if s.deviceEntry.MaxSegmentsAccepted == nil {
+ } else if s.deviceInfo.MaxSegmentsAccepted == nil {
log.Debug().Msgf("server doesn't say maximum number of segments")
- } else if s.segmentCount > s.deviceEntry.MaxSegmentsAccepted.MaxSegments() {
+ } else if s.segmentCount > s.deviceInfo.MaxSegmentsAccepted.MaxSegments() {
log.Debug().Msg("server can't receive enough segments")
abort, err := s.abort(readWriteModel.BACnetAbortReason_APDU_TOO_LONG)
if err != nil {
@@ -909,7 +910,7 @@ func NewServerSSM(sap SSMSAPRequirements, pduAddress []byte) (*ServerSSM, error)
return nil, err
}
// TODO: if deviceEntry is not there get it now...
- if &ssm.deviceEntry == nil {
+ if &ssm.deviceInfo == nil {
// TODO: get entry for device, store it in inventory
log.Debug().Msg("Accquire device information")
}
@@ -930,7 +931,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
if s.state == COMPLETED || s.state == ABORTED {
log.Debug().Msg("remove from active transaction")
s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list
- if s.deviceEntry != nil {
+ if s.deviceInfo != nil {
// TODO: release device entry
log.Debug().Msg("release device entry")
}
@@ -1020,10 +1021,10 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
// the segment size is the minimum of the size of the largest packet that can be delivered to the client and the
// largest it can accept
- if s.deviceEntry == nil || s.deviceEntry.MaximumNpduLength == nil {
+ if s.deviceInfo == nil || s.deviceInfo.MaximumNpduLength == nil {
s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
} else {
- s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
+ s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
}
// compute the segment count
@@ -1158,16 +1159,18 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
s.segmentedResponseAccepted = apduConfirmedRequest.GetSegmentedResponseAccepted()
// if there is a cache record, check to see if it needs to be updated
- if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceEntry != nil {
- switch s.deviceEntry.SegmentationSupported {
+ if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceInfo != nil {
+ switch *s.deviceInfo.SegmentationSupported {
case readWriteModel.BACnetSegmentation_NO_SEGMENTATION:
log.Debug().Msg("client actually supports segmented receive")
- s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+ segmentedReceive := readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+ s.deviceInfo.SegmentationSupported = &segmentedReceive
// TODO: bacpypes updates the cache here but as we have a pointer to the entry we should need that. Maybe we should because concurrency... lets see later
case readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT:
log.Debug().Msg("client actually supports both segmented transmit and receive")
- s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+ segmentedBoth := readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+ s.deviceInfo.SegmentationSupported = &segmentedBoth
// TODO: bacpypes updates the cache here but as we have a pointer to the entry we should need that. Maybe we should because concurrency... lets see later
case readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE, readWriteModel.BACnetSegmentation_SEGMENTED_BOTH:
@@ -1182,11 +1185,11 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
// received
getMaxApduLengthAccepted := apduConfirmedRequest.GetMaxApduLengthAccepted()
s.maxApduLengthAccepted = &getMaxApduLengthAccepted
- if s.deviceEntry != nil && s.deviceEntry.MaximumApduLengthAccepted != nil {
- if *s.deviceEntry.MaximumApduLengthAccepted < *s.maxApduLengthAccepted {
+ if s.deviceInfo != nil && s.deviceInfo.MaximumApduLengthAccepted != nil {
+ if *s.deviceInfo.MaximumApduLengthAccepted < *s.maxApduLengthAccepted {
log.Debug().Msg("apdu max reponse encoding error")
} else {
- s.maxApduLengthAccepted = s.deviceEntry.MaximumApduLengthAccepted
+ s.maxApduLengthAccepted = s.deviceInfo.MaximumApduLengthAccepted
}
}
log.Debug().Msgf("maxApduLengthAccepted %s", *s.maxApduLengthAccepted)
@@ -1459,8 +1462,8 @@ type StateMachineAccessPoint struct {
*Client
*ServiceAccessPoint
- localDevice DeviceEntry
- deviceInventory *DeviceInventory
+ localDevice LocalDeviceObject
+ deviceInventory *DeviceInfoCache
nextInvokeId uint8
clientTransactions []*ClientSSM
serverTransactions []*ServerSSM
@@ -1475,7 +1478,7 @@ type StateMachineAccessPoint struct {
applicationTimeout uint
}
-func NewStateMachineAccessPoint(localDevice DeviceEntry, deviceInventory *DeviceInventory, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
s := &StateMachineAccessPoint{
@@ -1788,11 +1791,11 @@ func (s *StateMachineAccessPoint) SapConfirmation(apdu readWriteModel.APDU, pduD
return nil
}
-func (s *StateMachineAccessPoint) GetDeviceInventory() *DeviceInventory {
+func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
return s.deviceInventory
}
-func (s *StateMachineAccessPoint) GetLocalDevice() DeviceEntry {
+func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
return s.localDevice
}
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 17aba59782..0ae673addd 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -20,10 +20,184 @@
package bacnetip
import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
+ "hash/fnv"
)
+type DeviceInfo struct {
+ DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
+ Address []byte
+
+ MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+ SegmentationSupported *readWriteModel.BACnetSegmentation
+ MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted
+ VendorId *readWriteModel.BACnetVendorId
+ MaximumNpduLength *uint
+
+ _refCount int
+ _cacheKey DeviceInfoCacheKey
+}
+
+func NewDeviceInfo(deviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier, address []byte) *DeviceInfo {
+ return &DeviceInfo{
+ DeviceIdentifier: deviceIdentifier,
+ Address: address,
+
+ MaximumApduLengthAccepted: func() *readWriteModel.MaxApduLengthAccepted {
+ octets1024 := readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+ return &octets1024
+ }(),
+ SegmentationSupported: func() *readWriteModel.BACnetSegmentation {
+ noSegmentation := readWriteModel.BACnetSegmentation_NO_SEGMENTATION
+ return &noSegmentation
+ }(),
+ }
+}
+
+// DeviceInfoCacheKey caches by either Instance, PduSource of both
+type DeviceInfoCacheKey struct {
+ Instance *uint32
+ PduSource []byte
+}
+
+func (k DeviceInfoCacheKey) HashKey() uint32 {
+ h := fnv.New32a()
+ if k.Instance != nil {
+ _ = binary.Write(h, binary.BigEndian, *k.Instance)
+ }
+ _, _ = h.Write(k.PduSource)
+ return h.Sum32()
+}
+
+func (k DeviceInfoCacheKey) String() string {
+ return fmt.Sprintf("key: %d/%x", k.Instance, k.PduSource)
+}
+
+type DeviceInfoCache struct {
+ cache map[uint32]DeviceInfo
+}
+
+func NewDeviceInfoCache() *DeviceInfoCache {
+ return &DeviceInfoCache{
+ cache: make(map[uint32]DeviceInfo),
+ }
+}
+
+// HasDeviceInfo Return true if cache has information about the device.
+func (i *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool {
+ _, ok := i.cache[key.HashKey()]
+ return ok
+}
+
+// IAmDeviceInfo Create a device information record based on the contents of an IAmRequest and put it in the cache.
+func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) {
+ log.Debug().Msgf("IAmDeviceInfo\n%s", iAm)
+
+ deviceIdentifier := iAm.GetDeviceIdentifier()
+ // Get the device instance
+ deviceInstance := deviceIdentifier.GetInstanceNumber()
+
+ // get the existing cache record if it exists
+ deviceInfo, ok := i.cache[DeviceInfoCacheKey{&deviceInstance, nil}.HashKey()]
+
+ // maybe there is a record for this address
+ if !ok {
+ deviceInfo, ok = i.cache[DeviceInfoCacheKey{nil, pduSource}.HashKey()]
+ }
+
+ // make a new one using the class provided
+ if !ok {
+ deviceInfo = DeviceInfo{
+ DeviceIdentifier: deviceIdentifier.GetPayload(),
+ Address: pduSource,
+ }
+ }
+
+ // jam in the correct values
+ maximumApduLengthAccepted := readWriteModel.MaxApduLengthAccepted(iAm.GetMaximumApduLengthAcceptedLength().GetActualValue())
+ deviceInfo.MaximumApduLengthAccepted = &maximumApduLengthAccepted
+ sementationSupported := iAm.GetSegmentationSupported().GetValue()
+ deviceInfo.SegmentationSupported = &sementationSupported
+ vendorId := iAm.GetVendorId().GetValue()
+ deviceInfo.VendorId = &vendorId
+
+ // tell the cache this is an updated record
+ i.UpdateDeviceInfo(deviceInfo)
+}
+
+// GetDeviceInfo gets a DeviceInfo from cache
+func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+ log.Debug().Msgf("GetDeviceInfo %s", key)
+
+ // get the info if it's there
+ deviceInfo, ok := i.cache[key.HashKey()]
+ log.Debug().Msgf("deviceInfo: %#v", deviceInfo)
+
+ return deviceInfo, ok
+}
+
+// UpdateDeviceInfo The application has updated one or more fields in the device information record and the cache needs
+// to be updated to reflect the changes. If this is a cached version of a persistent record then this is the
+// opportunity to update the database.
+func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
+ log.Debug().Msgf("UpdateDeviceInfo %#v", deviceInfo)
+
+ // get the current key
+ cacheKey := deviceInfo._cacheKey
+ if cacheKey.Instance != nil && deviceInfo.DeviceIdentifier.GetInstanceNumber() != *cacheKey.Instance {
+ instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
+ cacheKey.Instance = &instanceNumber
+ delete(i.cache, cacheKey.HashKey())
+ i.cache[DeviceInfoCacheKey{Instance: &instanceNumber}.HashKey()] = deviceInfo
+ }
+ if bytes.Compare(deviceInfo.Address, cacheKey.PduSource) != 0 {
+ cacheKey.PduSource = deviceInfo.Address
+ delete(i.cache, cacheKey.HashKey())
+ i.cache[DeviceInfoCacheKey{PduSource: cacheKey.PduSource}.HashKey()] = deviceInfo
+ }
+
+ // update the key
+ instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
+ deviceInfo._cacheKey = DeviceInfoCacheKey{
+ Instance: &instanceNumber,
+ PduSource: deviceInfo.Address,
+ }
+ i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+}
+
+// Acquire Return the known information about the device and mark the record as being used by a segmentation state
+// machine.
+func (i *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+ log.Debug().Msgf("Acquire %#v", key)
+
+ deviceInfo, ok := i.cache[key.HashKey()]
+ if ok {
+ deviceInfo._refCount++
+ i.cache[key.HashKey()] = deviceInfo
+ }
+
+ return deviceInfo, ok
+}
+
+// Release This function is called by the segmentation state machine when it has finished with the device information.
+func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
+
+ //this information record might be used by more than one SSM
+ if deviceInfo._refCount == 0 {
+ return errors.New("reference count")
+ }
+
+ // decrement the reference count
+ deviceInfo._refCount--
+ i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+ return nil
+}
+
// TODO: implement
type Application struct {
ApplicationServiceElement
@@ -58,7 +232,7 @@ type BIPSimpleApplication struct {
mux *UDPMultiplexer
}
-func NewBIPSimpleApplication(localDevice DeviceEntry, localAddress, deviceInfoCache *DeviceInventory, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
b := &BIPSimpleApplication{}
var err error
b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
index cbf38a66aa..cf0e825ddd 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -19,6 +19,8 @@
package bacnetip
+import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+
type WhoIsIAmServices struct {
}
@@ -26,3 +28,23 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
// TODO: implement me
return nil, nil
}
+
+type LocalDeviceObject struct {
+ NumberOfAPDURetries uint
+ APDUTimeout uint
+ SegmentationSupported readWriteModel.BACnetSegmentation
+ APDUSegmentTimeout uint
+ MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted
+ MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+}
+
+func NewLocalDeviceObject() *LocalDeviceObject {
+ return &LocalDeviceObject{
+ NumberOfAPDURetries: 0,
+ APDUTimeout: 0,
+ SegmentationSupported: 0,
+ APDUSegmentTimeout: 0,
+ MaxSegmentsAccepted: nil,
+ MaximumApduLengthAccepted: nil,
+ }
+}
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
index 350b3c0393..86f3f5c690 100644
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -26,6 +26,7 @@ import (
"time"
)
+// TODO: migrate into device info cache
type DeviceInventory struct {
sync.RWMutex
devices map[string]DeviceEntry