You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/03 15:01:06 UTC
[plc4x] branch develop updated: feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new 49551174a feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
49551174a is described below
commit 49551174ac54c88b4a93c70ae4697c3ca6ab8444
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Nov 3 16:00:58 2022 +0100
feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
---
plc4go/internal/bacnetip/DeviceInventory.go | 82 ++++++++++++
plc4go/internal/bacnetip/Driver.go | 3 +-
plc4go/internal/bacnetip/MessageCodec.go | 14 +++
plc4go/internal/bacnetip/Reader.go | 2 +-
.../internal/bacnetip/TransactionStateMachine.go | 139 +++++++++++++++++++++
5 files changed, 238 insertions(+), 2 deletions(-)
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
new file mode 100644
index 000000000..a06701777
--- /dev/null
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bacnetip
+
+import (
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/pkg/errors"
+ "math"
+ "sync"
+ "time"
+)
+
+type DeviceInventory struct {
+ sync.RWMutex
+ devices map[string]DeviceEntry
+}
+
+func (d *DeviceInventory) getEntryForDestination(destination []uint8) (DeviceEntry, error) {
+ d.RLock()
+ defer d.RUnlock()
+ deviceKey := string(destination)
+ deviceEntry, ok := d.devices[deviceKey]
+ if !ok {
+ return NoDeviceEntry, errors.Errorf("no entry found for device key %s", deviceKey)
+ }
+ return deviceEntry, nil
+}
+
+var NoDeviceEntry = DeviceEntry{
+ MaximumApduLengthAcceptedLength: readWriteModel.NewBACnetTagPayloadUnsignedInteger(nil, nil, nil, nil, nil, nil, nil, func() *uint64 {
+ var maxUint64 uint64 = math.MaxUint64
+ return &maxUint64
+ }(), 4),
+}
+
+type DeviceEntry struct {
+ DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
+ MaximumApduLengthAcceptedLength readWriteModel.BACnetTagPayloadUnsignedInteger
+ SegmentationSupported bool
+ VendorId readWriteModel.BACnetVendorId
+ DeviceObjects []DeviceObject
+}
+
+func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
+ var deviceObjects []DeviceObject
+ for _, object := range d.DeviceObjects {
+ shouldBeAdded := true
+ for _, objectFilter := range filter {
+ shouldBeAdded = shouldBeAdded && objectFilter(object)
+ }
+ if shouldBeAdded {
+ deviceObjects = append(deviceObjects, object)
+ }
+ }
+ return deviceObjects
+}
+
+type DeviceObjectFilter func(DeviceObject) bool
+
+type DeviceObject struct {
+ ObjectName string
+ ObjectIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
+ CachedObjectValue interface{}
+ TimeOfCache time.Time
+}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 72176c15b..fd181cad7 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -42,6 +42,7 @@ type Driver struct {
tm spi.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
+ DeviceInventory DeviceInventory
}
func NewDriver() plc4go.PlcDriver {
@@ -121,7 +122,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
return ch
}
- codec := NewMessageCodec(transportInstance)
+ codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
log.Debug().Msgf("working with codec %#v", codec)
// Create the new connection
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index f8fe48d7c..8675796d8 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -28,6 +28,20 @@ import (
"github.com/rs/zerolog/log"
)
+// ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
+type ApplicationLayerMessageCodec struct {
+ TransactionStateMachine
+}
+
+func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
+ return &ApplicationLayerMessageCodec{
+ TransactionStateMachine{
+ MessageCodec: NewMessageCodec(transportInstance),
+ deviceInventory: deviceInventory,
+ },
+ }
+}
+
type MessageCodec struct {
_default.DefaultCodec
}
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index a950cccb5..e2720e395 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -189,7 +189,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
// Send the over the wire
log.Trace().Msg("Send ")
if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
- bvlc, ok := message.(readWriteModel.BVLC)
+ bvlc, ok := message.(readWriteModel.BVLCExactly)
if !ok {
log.Debug().Msgf("Received strange type %T", bvlc)
return false
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
new file mode 100644
index 000000000..21b0c3147
--- /dev/null
+++ b/plc4go/internal/bacnetip/TransactionStateMachine.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bacnetip
+
+import (
+ "context"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+ "github.com/apache/plc4x/plc4go/spi"
+ "github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
+ "time"
+)
+
+// TransactionStateMachine is the implementation of the bacnet transaction state machine
+type TransactionStateMachine struct {
+ *MessageCodec
+ deviceInventory *DeviceInventory
+ retryCount int
+ segmentRetryCount int
+ duplicateCount int
+ sentAllSegments bool
+ lastSequenceNumber int
+ initialSequenceNumber int
+ actualWindowSize int
+ proposeWindowSize int
+ segmentTimer int
+ RequestTimer int
+}
+
+func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
+ return t
+}
+
+func (t *TransactionStateMachine) Send(message spi.Message) error {
+ if handled, err := t.handleOutboundMessage(message); handled {
+ return nil
+ } else if err != nil {
+ return errors.Wrap(err, "Error handling message")
+ } else {
+ return t.MessageCodec.Send(message)
+ }
+}
+
+func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+ // TODO: detect overflow
+ return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
+}
+
+func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+ // Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
+ if err := ctx.Err(); err != nil {
+ return errors.Wrap(err, "Not sending message as context is aborted")
+ }
+ log.Trace().Msg("Sending request")
+ // Send the actual message
+ err := t.Send(message)
+ if err != nil {
+ return errors.Wrap(err, "Error sending the request")
+ }
+ return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
+}
+
+func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
+ switch message := message.(type) {
+ case readWriteModel.BVLCExactly:
+ bvlc := message
+ var npdu readWriteModel.NPDU
+ if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
+ npdu = npduRetriever.GetNpdu()
+ } else {
+ log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
+ return false, nil
+ }
+ if npdu.GetControl().GetMessageTypeFieldPresent() {
+ log.Trace().Msg("No message type field present")
+ return false, nil
+ }
+ var entryForDestination = NoDeviceEntry
+ if npdu.GetControl().GetDestinationSpecified() {
+ var err error
+ if entryForDestination, err = t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
+ // Get information from the device first
+ // TODO: get information with who-has maybe or directed... not sure now
+ // TODO: set entry once received
+ }
+ }
+ // TODO: should we continue if we don't have a destination
+ _ = entryForDestination
+ apdu := npdu.GetApdu()
+ switch apdu := apdu.(type) {
+ case readWriteModel.APDUConfirmedRequestExactly:
+ // TODO: check if adpu length is the magic number (it should be "unencoded")
+ maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
+ apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
+ if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
+ return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
+ }
+ // TODO: handle potential retry
+ if apduLengthDoesOverflow {
+ // TODO: handle potential segmentation
+ }
+ return false, nil
+ case readWriteModel.APDUComplexAckExactly:
+ // TODO: check if adpu length is the magic number (it should be "unencoded")
+ maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
+ apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
+ if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
+ return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
+ }
+ if apduLengthDoesOverflow {
+ // TODO: handle potential segmentation
+ }
+ return false, nil
+ default:
+ log.Trace().Msgf("APDU type not relevant %T present", apdu)
+ return false, nil
+ }
+ default:
+ log.Trace().Msgf("Message type not relevant %T present", message)
+ return false, nil
+ }
+}