You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/14 15:42:07 UTC

[plc4x] branch develop updated: feat(plc4go/bacnet): use upstream device info cache

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 83606befb0 feat(plc4go/bacnet): use upstream device info cache
83606befb0 is described below

commit 83606befb06f9012aa68339dd328d068a83b9a54
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Nov 14 14:16:31 2022 +0100

    feat(plc4go/bacnet): use upstream device info cache
---
 plc4go/internal/bacnetip/ApplicationLayer.go  |  71 ++++++-----
 plc4go/internal/bacnetip/ApplicationModule.go | 176 +++++++++++++++++++++++++-
 plc4go/internal/bacnetip/Device.go            |  22 ++++
 plc4go/internal/bacnetip/DeviceInventory.go   |   1 +
 4 files changed, 235 insertions(+), 35 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index c2a1324adc..04dc50f415 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -75,8 +75,8 @@ type segmentAPDU struct {
 type SSMSAPRequirements interface {
 	_ServiceAccessPoint
 	_Client
-	GetDeviceInventory() *DeviceInventory
-	GetLocalDevice() DeviceEntry
+	GetDeviceInfoCache() *DeviceInfoCache
+	GetLocalDevice() LocalDeviceObject
 	GetProposedWindowSize() uint8
 	GetClientTransactions() []*ClientSSM
 	GetServerTransactions() []*ServerSSM
@@ -89,8 +89,8 @@ type SSM struct {
 
 	ssmSAP SSMSAPRequirements
 
-	pduAddress  []byte
-	deviceEntry *DeviceEntry
+	pduAddress []byte
+	deviceInfo *DeviceInfo
 
 	invokeId uint8
 
@@ -116,15 +116,16 @@ type SSM struct {
 
 func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
 	log.Debug().Interface("sap", sap).Bytes("pdu_address", pduAddress).Msg("init")
-	deviceEntry, err := sap.GetDeviceInventory().getEntryForDestination(pduAddress)
-	if err != nil {
-		return SSM{}, errors.Wrap(err, "Can't create SSM")
+	var deviceInfo *DeviceInfo
+	deviceInfoTemp, ok := sap.GetDeviceInfoCache().GetDeviceInfo(DeviceInfoCacheKey{PduSource: pduAddress})
+	if ok {
+		deviceInfo = &deviceInfoTemp
 	}
 	localDevice := sap.GetLocalDevice()
 	return SSM{
 		ssmSAP:                sap,
 		pduAddress:            pduAddress,
-		deviceEntry:           deviceEntry,
+		deviceInfo:            deviceInfo,
 		state:                 IDLE,
 		numberOfApduRetries:   localDevice.NumberOfAPDURetries,
 		apduTimeout:           localDevice.APDUTimeout,
@@ -342,7 +343,7 @@ func NewClientSSM(sap SSMSAPRequirements, pduAddress []byte) (*ClientSSM, error)
 		return nil, err
 	}
 	// TODO: if deviceEntry is not there get it now...
-	if ssm.deviceEntry == nil {
+	if ssm.deviceInfo == nil {
 		// TODO: get entry for device, store it in inventory
 		log.Debug().Msg("Accquire device information")
 	}
@@ -362,7 +363,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
 	if s.state == COMPLETED || s.state == ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list
-		if s.deviceEntry == nil {
+		if s.deviceInfo == nil {
 			// TODO: release device entry
 			log.Debug().Msg("release device entry")
 		}
@@ -395,13 +396,13 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 
 	// if the max apdu length of the server isn't known, assume that it is the same size as our own and will be the segment
 	//        size
-	if s.deviceEntry == nil || s.deviceEntry.MaximumApduLengthAccepted != nil {
+	if s.deviceInfo == nil || s.deviceInfo.MaximumApduLengthAccepted != nil {
 		s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
-	} else if s.deviceEntry.MaximumNpduLength == nil {
+	} else if s.deviceInfo.MaximumNpduLength == nil {
 		//      if the max npdu length of the server isn't known, assume that it is the same as the max apdu length accepted
 		s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
 	} else {
-		s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
+		s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
 	}
 	log.Debug().Msgf("segment size %d", s.segmentSize)
 
@@ -426,9 +427,9 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 			return s.Response(abort)
 		}
 
-		if s.deviceEntry == nil {
+		if s.deviceInfo == nil {
 			log.Debug().Msg("no server info for segmentation support")
-		} else if s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && s.deviceEntry.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
+		} else if *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT && *s.deviceInfo.SegmentationSupported != readWriteModel.BACnetSegmentation_SEGMENTED_BOTH {
 			log.Debug().Msg("server can't receive segmented requests")
 			abort, err := s.abort(readWriteModel.BACnetAbortReason_SEGMENTATION_NOT_SUPPORTED)
 			if err != nil {
@@ -438,11 +439,11 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 		}
 
 		// make sure we don't exceed the number of segments in our request that the server said it was willing to accept
-		if s.deviceEntry == nil {
+		if s.deviceInfo == nil {
 			log.Debug().Msg("no server info for maximum number of segments")
-		} else if s.deviceEntry.MaxSegmentsAccepted == nil {
+		} else if s.deviceInfo.MaxSegmentsAccepted == nil {
 			log.Debug().Msgf("server doesn't say maximum number of segments")
-		} else if s.segmentCount > s.deviceEntry.MaxSegmentsAccepted.MaxSegments() {
+		} else if s.segmentCount > s.deviceInfo.MaxSegmentsAccepted.MaxSegments() {
 			log.Debug().Msg("server can't receive enough segments")
 			abort, err := s.abort(readWriteModel.BACnetAbortReason_APDU_TOO_LONG)
 			if err != nil {
@@ -909,7 +910,7 @@ func NewServerSSM(sap SSMSAPRequirements, pduAddress []byte) (*ServerSSM, error)
 		return nil, err
 	}
 	// TODO: if deviceEntry is not there get it now...
-	if &ssm.deviceEntry == nil {
+	if &ssm.deviceInfo == nil {
 		// TODO: get entry for device, store it in inventory
 		log.Debug().Msg("Accquire device information")
 	}
@@ -930,7 +931,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
 	if s.state == COMPLETED || s.state == ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list
-		if s.deviceEntry != nil {
+		if s.deviceInfo != nil {
 			// TODO: release device entry
 			log.Debug().Msg("release device entry")
 		}
@@ -1020,10 +1021,10 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 
 		// the segment size is the minimum of the size of the largest packet that can be delivered to the client and the
 		//            largest it can accept
-		if s.deviceEntry == nil || s.deviceEntry.MaximumNpduLength == nil {
+		if s.deviceInfo == nil || s.deviceInfo.MaximumNpduLength == nil {
 			s.segmentSize = uint(s.maxApduLengthAccepted.NumberOfOctets())
 		} else {
-			s.segmentSize = utils.Min(*s.deviceEntry.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
+			s.segmentSize = utils.Min(*s.deviceInfo.MaximumNpduLength, uint(s.maxApduLengthAccepted.NumberOfOctets()))
 		}
 
 		// compute the segment count
@@ -1158,16 +1159,18 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 	s.segmentedResponseAccepted = apduConfirmedRequest.GetSegmentedResponseAccepted()
 
 	// if there is a cache record, check to see if it needs to be updated
-	if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceEntry != nil {
-		switch s.deviceEntry.SegmentationSupported {
+	if apduConfirmedRequest.GetSegmentedResponseAccepted() && s.deviceInfo != nil {
+		switch *s.deviceInfo.SegmentationSupported {
 		case readWriteModel.BACnetSegmentation_NO_SEGMENTATION:
 			log.Debug().Msg("client actually supports segmented receive")
-			s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+			segmentedReceive := readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE
+			s.deviceInfo.SegmentationSupported = &segmentedReceive
 
 		// TODO: bacpypes updates the cache here but as we have a pointer  to the entry we should need that. Maybe we should because concurrency... lets see later
 		case readWriteModel.BACnetSegmentation_SEGMENTED_TRANSMIT:
 			log.Debug().Msg("client actually supports both segmented transmit and receive")
-			s.deviceEntry.SegmentationSupported = readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+			segmentedBoth := readWriteModel.BACnetSegmentation_SEGMENTED_BOTH
+			s.deviceInfo.SegmentationSupported = &segmentedBoth
 
 			// TODO: bacpypes updates the cache here but as we have a pointer  to the entry we should need that. Maybe we should because concurrency... lets see later
 		case readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE, readWriteModel.BACnetSegmentation_SEGMENTED_BOTH:
@@ -1182,11 +1185,11 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 	//        received
 	getMaxApduLengthAccepted := apduConfirmedRequest.GetMaxApduLengthAccepted()
 	s.maxApduLengthAccepted = &getMaxApduLengthAccepted
-	if s.deviceEntry != nil && s.deviceEntry.MaximumApduLengthAccepted != nil {
-		if *s.deviceEntry.MaximumApduLengthAccepted < *s.maxApduLengthAccepted {
+	if s.deviceInfo != nil && s.deviceInfo.MaximumApduLengthAccepted != nil {
+		if *s.deviceInfo.MaximumApduLengthAccepted < *s.maxApduLengthAccepted {
 			log.Debug().Msg("apdu max reponse encoding error")
 		} else {
-			s.maxApduLengthAccepted = s.deviceEntry.MaximumApduLengthAccepted
+			s.maxApduLengthAccepted = s.deviceInfo.MaximumApduLengthAccepted
 		}
 	}
 	log.Debug().Msgf("maxApduLengthAccepted %s", *s.maxApduLengthAccepted)
@@ -1459,8 +1462,8 @@ type StateMachineAccessPoint struct {
 	*Client
 	*ServiceAccessPoint
 
-	localDevice           DeviceEntry
-	deviceInventory       *DeviceInventory
+	localDevice           LocalDeviceObject
+	deviceInventory       *DeviceInfoCache
 	nextInvokeId          uint8
 	clientTransactions    []*ClientSSM
 	serverTransactions    []*ServerSSM
@@ -1475,7 +1478,7 @@ type StateMachineAccessPoint struct {
 	applicationTimeout    uint
 }
 
-func NewStateMachineAccessPoint(localDevice DeviceEntry, deviceInventory *DeviceInventory, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
 	log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
 
 	s := &StateMachineAccessPoint{
@@ -1788,11 +1791,11 @@ func (s *StateMachineAccessPoint) SapConfirmation(apdu readWriteModel.APDU, pduD
 	return nil
 }
 
-func (s *StateMachineAccessPoint) GetDeviceInventory() *DeviceInventory {
+func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
 	return s.deviceInventory
 }
 
-func (s *StateMachineAccessPoint) GetLocalDevice() DeviceEntry {
+func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
 	return s.localDevice
 }
 
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 17aba59782..0ae673addd 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -20,10 +20,184 @@
 package bacnetip
 
 import (
+	"bytes"
+	"encoding/binary"
+	"fmt"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
+	"hash/fnv"
 )
 
+type DeviceInfo struct {
+	DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
+	Address          []byte
+
+	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+	SegmentationSupported     *readWriteModel.BACnetSegmentation
+	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+	VendorId                  *readWriteModel.BACnetVendorId
+	MaximumNpduLength         *uint
+
+	_refCount int
+	_cacheKey DeviceInfoCacheKey
+}
+
+func NewDeviceInfo(deviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier, address []byte) *DeviceInfo {
+	return &DeviceInfo{
+		DeviceIdentifier: deviceIdentifier,
+		Address:          address,
+
+		MaximumApduLengthAccepted: func() *readWriteModel.MaxApduLengthAccepted {
+			octets1024 := readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+			return &octets1024
+		}(),
+		SegmentationSupported: func() *readWriteModel.BACnetSegmentation {
+			noSegmentation := readWriteModel.BACnetSegmentation_NO_SEGMENTATION
+			return &noSegmentation
+		}(),
+	}
+}
+
+// DeviceInfoCacheKey caches by either Instance, PduSource of both
+type DeviceInfoCacheKey struct {
+	Instance  *uint32
+	PduSource []byte
+}
+
+func (k DeviceInfoCacheKey) HashKey() uint32 {
+	h := fnv.New32a()
+	if k.Instance != nil {
+		_ = binary.Write(h, binary.BigEndian, *k.Instance)
+	}
+	_, _ = h.Write(k.PduSource)
+	return h.Sum32()
+}
+
+func (k DeviceInfoCacheKey) String() string {
+	return fmt.Sprintf("key: %d/%x", k.Instance, k.PduSource)
+}
+
+type DeviceInfoCache struct {
+	cache map[uint32]DeviceInfo
+}
+
+func NewDeviceInfoCache() *DeviceInfoCache {
+	return &DeviceInfoCache{
+		cache: make(map[uint32]DeviceInfo),
+	}
+}
+
+// HasDeviceInfo Return true if cache has information about the device.
+func (i *DeviceInfoCache) HasDeviceInfo(key DeviceInfoCacheKey) bool {
+	_, ok := i.cache[key.HashKey()]
+	return ok
+}
+
+// IAmDeviceInfo Create a device information record based on the contents of an IAmRequest and put it in the cache.
+func (i *DeviceInfoCache) IAmDeviceInfo(iAm readWriteModel.BACnetUnconfirmedServiceRequestIAm, pduSource []byte) {
+	log.Debug().Msgf("IAmDeviceInfo\n%s", iAm)
+
+	deviceIdentifier := iAm.GetDeviceIdentifier()
+	// Get the device instance
+	deviceInstance := deviceIdentifier.GetInstanceNumber()
+
+	// get the existing cache record if it exists
+	deviceInfo, ok := i.cache[DeviceInfoCacheKey{&deviceInstance, nil}.HashKey()]
+
+	// maybe there is a record for this address
+	if !ok {
+		deviceInfo, ok = i.cache[DeviceInfoCacheKey{nil, pduSource}.HashKey()]
+	}
+
+	// make a new one using the class provided
+	if !ok {
+		deviceInfo = DeviceInfo{
+			DeviceIdentifier: deviceIdentifier.GetPayload(),
+			Address:          pduSource,
+		}
+	}
+
+	// jam in the correct values
+	maximumApduLengthAccepted := readWriteModel.MaxApduLengthAccepted(iAm.GetMaximumApduLengthAcceptedLength().GetActualValue())
+	deviceInfo.MaximumApduLengthAccepted = &maximumApduLengthAccepted
+	sementationSupported := iAm.GetSegmentationSupported().GetValue()
+	deviceInfo.SegmentationSupported = &sementationSupported
+	vendorId := iAm.GetVendorId().GetValue()
+	deviceInfo.VendorId = &vendorId
+
+	// tell the cache this is an updated record
+	i.UpdateDeviceInfo(deviceInfo)
+}
+
+// GetDeviceInfo gets a DeviceInfo from cache
+func (i *DeviceInfoCache) GetDeviceInfo(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+	log.Debug().Msgf("GetDeviceInfo %s", key)
+
+	// get the info if it's there
+	deviceInfo, ok := i.cache[key.HashKey()]
+	log.Debug().Msgf("deviceInfo: %#v", deviceInfo)
+
+	return deviceInfo, ok
+}
+
+// UpdateDeviceInfo The application has updated one or more fields in the device information record and the cache needs
+//        to be updated to reflect the changes.  If this is a cached version of a persistent record then this is the
+//        opportunity to update the database.
+func (i *DeviceInfoCache) UpdateDeviceInfo(deviceInfo DeviceInfo) {
+	log.Debug().Msgf("UpdateDeviceInfo %#v", deviceInfo)
+
+	// get the current key
+	cacheKey := deviceInfo._cacheKey
+	if cacheKey.Instance != nil && deviceInfo.DeviceIdentifier.GetInstanceNumber() != *cacheKey.Instance {
+		instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
+		cacheKey.Instance = &instanceNumber
+		delete(i.cache, cacheKey.HashKey())
+		i.cache[DeviceInfoCacheKey{Instance: &instanceNumber}.HashKey()] = deviceInfo
+	}
+	if bytes.Compare(deviceInfo.Address, cacheKey.PduSource) != 0 {
+		cacheKey.PduSource = deviceInfo.Address
+		delete(i.cache, cacheKey.HashKey())
+		i.cache[DeviceInfoCacheKey{PduSource: cacheKey.PduSource}.HashKey()] = deviceInfo
+	}
+
+	// update the key
+	instanceNumber := deviceInfo.DeviceIdentifier.GetInstanceNumber()
+	deviceInfo._cacheKey = DeviceInfoCacheKey{
+		Instance:  &instanceNumber,
+		PduSource: deviceInfo.Address,
+	}
+	i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+}
+
+// Acquire Return the known information about the device and mark the record as being used by a segmentation state
+//        machine.
+func (i *DeviceInfoCache) Acquire(key DeviceInfoCacheKey) (DeviceInfo, bool) {
+	log.Debug().Msgf("Acquire %#v", key)
+
+	deviceInfo, ok := i.cache[key.HashKey()]
+	if ok {
+		deviceInfo._refCount++
+		i.cache[key.HashKey()] = deviceInfo
+	}
+
+	return deviceInfo, ok
+}
+
+// Release This function is called by the segmentation state machine when it has finished with the device information.
+func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
+
+	//this information record might be used by more than one SSM
+	if deviceInfo._refCount == 0 {
+		return errors.New("reference count")
+	}
+
+	// decrement the reference count
+	deviceInfo._refCount--
+	i.cache[deviceInfo._cacheKey.HashKey()] = deviceInfo
+	return nil
+}
+
 // TODO: implement
 type Application struct {
 	ApplicationServiceElement
@@ -58,7 +232,7 @@ type BIPSimpleApplication struct {
 	mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice DeviceEntry, localAddress, deviceInfoCache *DeviceInventory, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
 	b := &BIPSimpleApplication{}
 	var err error
 	b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
index cbf38a66aa..cf0e825ddd 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -19,6 +19,8 @@
 
 package bacnetip
 
+import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+
 type WhoIsIAmServices struct {
 }
 
@@ -26,3 +28,23 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
 	// TODO: implement me
 	return nil, nil
 }
+
+type LocalDeviceObject struct {
+	NumberOfAPDURetries       uint
+	APDUTimeout               uint
+	SegmentationSupported     readWriteModel.BACnetSegmentation
+	APDUSegmentTimeout        uint
+	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+}
+
+func NewLocalDeviceObject() *LocalDeviceObject {
+	return &LocalDeviceObject{
+		NumberOfAPDURetries:       0,
+		APDUTimeout:               0,
+		SegmentationSupported:     0,
+		APDUSegmentTimeout:        0,
+		MaxSegmentsAccepted:       nil,
+		MaximumApduLengthAccepted: nil,
+	}
+}
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
index 350b3c0393..86f3f5c690 100644
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -26,6 +26,7 @@ import (
 	"time"
 )
 
+// TODO: migrate into device info cache
 type DeviceInventory struct {
 	sync.RWMutex
 	devices map[string]DeviceEntry