You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/03 15:01:06 UTC

[plc4x] branch develop updated: feat(plc4go/bacnet): initial skeleton of TransactionStateMachine

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 49551174a feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
49551174a is described below

commit 49551174ac54c88b4a93c70ae4697c3ca6ab8444
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Nov 3 16:00:58 2022 +0100

    feat(plc4go/bacnet): initial skeleton of TransactionStateMachine
---
 plc4go/internal/bacnetip/DeviceInventory.go        |  82 ++++++++++++
 plc4go/internal/bacnetip/Driver.go                 |   3 +-
 plc4go/internal/bacnetip/MessageCodec.go           |  14 +++
 plc4go/internal/bacnetip/Reader.go                 |   2 +-
 .../internal/bacnetip/TransactionStateMachine.go   | 139 +++++++++++++++++++++
 5 files changed, 238 insertions(+), 2 deletions(-)

diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
new file mode 100644
index 000000000..a06701777
--- /dev/null
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bacnetip
+
+import (
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/pkg/errors"
+	"math"
+	"sync"
+	"time"
+)
+
+type DeviceInventory struct {
+	sync.RWMutex
+	devices map[string]DeviceEntry
+}
+
+func (d *DeviceInventory) getEntryForDestination(destination []uint8) (DeviceEntry, error) {
+	d.RLock()
+	defer d.RUnlock()
+	deviceKey := string(destination)
+	deviceEntry, ok := d.devices[deviceKey]
+	if !ok {
+		return NoDeviceEntry, errors.Errorf("no entry found for device key %s", deviceKey)
+	}
+	return deviceEntry, nil
+}
+
+var NoDeviceEntry = DeviceEntry{
+	MaximumApduLengthAcceptedLength: readWriteModel.NewBACnetTagPayloadUnsignedInteger(nil, nil, nil, nil, nil, nil, nil, func() *uint64 {
+		var maxUint64 uint64 = math.MaxUint64
+		return &maxUint64
+	}(), 4),
+}
+
+type DeviceEntry struct {
+	DeviceIdentifier                readWriteModel.BACnetTagPayloadObjectIdentifier
+	MaximumApduLengthAcceptedLength readWriteModel.BACnetTagPayloadUnsignedInteger
+	SegmentationSupported           bool
+	VendorId                        readWriteModel.BACnetVendorId
+	DeviceObjects                   []DeviceObject
+}
+
+func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
+	var deviceObjects []DeviceObject
+	for _, object := range d.DeviceObjects {
+		shouldBeAdded := true
+		for _, objectFilter := range filter {
+			shouldBeAdded = shouldBeAdded && objectFilter(object)
+		}
+		if shouldBeAdded {
+			deviceObjects = append(deviceObjects, object)
+		}
+	}
+	return deviceObjects
+}
+
+type DeviceObjectFilter func(DeviceObject) bool
+
+type DeviceObject struct {
+	ObjectName        string
+	ObjectIdentifier  readWriteModel.BACnetTagPayloadObjectIdentifier
+	CachedObjectValue interface{}
+	TimeOfCache       time.Time
+}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 72176c15b..fd181cad7 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -42,6 +42,7 @@ type Driver struct {
 	tm                      spi.RequestTransactionManager
 	awaitSetupComplete      bool
 	awaitDisconnectComplete bool
+	DeviceInventory         DeviceInventory
 }
 
 func NewDriver() plc4go.PlcDriver {
@@ -121,7 +122,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
 		return ch
 	}
 
-	codec := NewMessageCodec(transportInstance)
+	codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
 	log.Debug().Msgf("working with codec %#v", codec)
 
 	// Create the new connection
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index f8fe48d7c..8675796d8 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -28,6 +28,20 @@ import (
 	"github.com/rs/zerolog/log"
 )
 
+// ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
+type ApplicationLayerMessageCodec struct {
+	TransactionStateMachine
+}
+
+func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
+	return &ApplicationLayerMessageCodec{
+		TransactionStateMachine{
+			MessageCodec:    NewMessageCodec(transportInstance),
+			deviceInventory: deviceInventory,
+		},
+	}
+}
+
 type MessageCodec struct {
 	_default.DefaultCodec
 }
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index a950cccb5..e2720e395 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -189,7 +189,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
 			if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
-				bvlc, ok := message.(readWriteModel.BVLC)
+				bvlc, ok := message.(readWriteModel.BVLCExactly)
 				if !ok {
 					log.Debug().Msgf("Received strange type %T", bvlc)
 					return false
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
new file mode 100644
index 000000000..21b0c3147
--- /dev/null
+++ b/plc4go/internal/bacnetip/TransactionStateMachine.go
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bacnetip
+
+import (
+	"context"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi"
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog/log"
+	"time"
+)
+
+// TransactionStateMachine is the implementation of the bacnet transaction state machine
+type TransactionStateMachine struct {
+	*MessageCodec
+	deviceInventory       *DeviceInventory
+	retryCount            int
+	segmentRetryCount     int
+	duplicateCount        int
+	sentAllSegments       bool
+	lastSequenceNumber    int
+	initialSequenceNumber int
+	actualWindowSize      int
+	proposeWindowSize     int
+	segmentTimer          int
+	RequestTimer          int
+}
+
+func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
+	return t
+}
+
+func (t *TransactionStateMachine) Send(message spi.Message) error {
+	if handled, err := t.handleOutboundMessage(message); handled {
+		return nil
+	} else if err != nil {
+		return errors.Wrap(err, "Error handling message")
+	} else {
+		return t.MessageCodec.Send(message)
+	}
+}
+
+func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	// TODO: detect overflow
+	return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
+}
+
+func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	// Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
+	if err := ctx.Err(); err != nil {
+		return errors.Wrap(err, "Not sending message as context is aborted")
+	}
+	log.Trace().Msg("Sending request")
+	// Send the actual message
+	err := t.Send(message)
+	if err != nil {
+		return errors.Wrap(err, "Error sending the request")
+	}
+	return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
+}
+
+func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
+	switch message := message.(type) {
+	case readWriteModel.BVLCExactly:
+		bvlc := message
+		var npdu readWriteModel.NPDU
+		if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
+			npdu = npduRetriever.GetNpdu()
+		} else {
+			log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
+			return false, nil
+		}
+		if npdu.GetControl().GetMessageTypeFieldPresent() {
+			log.Trace().Msg("No message type field present")
+			return false, nil
+		}
+		var entryForDestination = NoDeviceEntry
+		if npdu.GetControl().GetDestinationSpecified() {
+			var err error
+			if entryForDestination, err = t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
+				// Get information from the device first
+				// TODO: get information with who-has maybe or directed... not sure now
+				// TODO: set entry once received
+			}
+		}
+		// TODO: should we continue if we don't have a destination
+		_ = entryForDestination
+		apdu := npdu.GetApdu()
+		switch apdu := apdu.(type) {
+		case readWriteModel.APDUConfirmedRequestExactly:
+			// TODO: check if adpu length is the magic number (it should be "unencoded")
+			maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
+			apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
+			if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
+				return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
+			}
+			// TODO: handle potential retry
+			if apduLengthDoesOverflow {
+				// TODO: handle potential segmentation
+			}
+			return false, nil
+		case readWriteModel.APDUComplexAckExactly:
+			// TODO: check if adpu length is the magic number (it should be "unencoded")
+			maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
+			apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
+			if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
+				return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
+			}
+			if apduLengthDoesOverflow {
+				// TODO: handle potential segmentation
+			}
+			return false, nil
+		default:
+			log.Trace().Msgf("APDU type not relevant %T present", apdu)
+			return false, nil
+		}
+	default:
+		log.Trace().Msgf("Message type not relevant %T present", message)
+		return false, nil
+	}
+}