You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/15 15:28:13 UTC
[plc4x] 01/02: refactor(plc4go/bacnet): restructure code to hook in application layer
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit c5639a65fddbfa6f159649330ebe0cf31742e928
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Nov 14 17:03:26 2022 +0100
refactor(plc4go/bacnet): restructure code to hook in application layer
---
plc4go/internal/bacnetip/ApplicationModule.go | 3 +-
plc4go/internal/bacnetip/Device.go | 25 +++-
plc4go/internal/bacnetip/DeviceInventory.go | 96 --------------
plc4go/internal/bacnetip/Driver.go | 84 ++++++------
plc4go/internal/bacnetip/MessageCodec.go | 65 +++++++++-
.../internal/bacnetip/TransactionStateMachine.go | 141 ---------------------
6 files changed, 127 insertions(+), 287 deletions(-)
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 0ae673addd..41d0107507 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -27,6 +27,7 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"hash/fnv"
+ "net"
)
type DeviceInfo struct {
@@ -232,7 +233,7 @@ type BIPSimpleApplication struct {
mux *UDPMultiplexer
}
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
b := &BIPSimpleApplication{}
var err error
b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
index cf0e825ddd..33c36205e6 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -29,6 +29,19 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
return nil, nil
}
+var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
+
+// _LocalDeviceObjectDefault is a device entry with default entries
+var _LocalDeviceObjectDefault = LocalDeviceObject{
+ MaximumApduLengthAccepted: &defaultMaxApduLength,
+ SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+ MaxSegmentsAccepted: &defaultMaxSegmentsAccepted,
+ APDUSegmentTimeout: 5000,
+ APDUTimeout: 3000,
+ NumberOfAPDURetries: 3,
+}
+
type LocalDeviceObject struct {
NumberOfAPDURetries uint
APDUTimeout uint
@@ -40,11 +53,11 @@ type LocalDeviceObject struct {
func NewLocalDeviceObject() *LocalDeviceObject {
return &LocalDeviceObject{
- NumberOfAPDURetries: 0,
- APDUTimeout: 0,
- SegmentationSupported: 0,
- APDUSegmentTimeout: 0,
- MaxSegmentsAccepted: nil,
- MaximumApduLengthAccepted: nil,
+ NumberOfAPDURetries: _LocalDeviceObjectDefault.NumberOfAPDURetries,
+ APDUTimeout: _LocalDeviceObjectDefault.APDUTimeout,
+ SegmentationSupported: _LocalDeviceObjectDefault.SegmentationSupported,
+ APDUSegmentTimeout: _LocalDeviceObjectDefault.APDUSegmentTimeout,
+ MaxSegmentsAccepted: _LocalDeviceObjectDefault.MaxSegmentsAccepted,
+ MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
}
}
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
deleted file mode 100644
index 86f3f5c690..0000000000
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
- readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
- "github.com/pkg/errors"
- "sync"
- "time"
-)
-
-// TODO: migrate into device info cache
-type DeviceInventory struct {
- sync.RWMutex
- devices map[string]DeviceEntry
-}
-
-func (d *DeviceInventory) getEntryForDestination(destination []uint8) (*DeviceEntry, error) {
- d.RLock()
- defer d.RUnlock()
- deviceKey := string(destination)
- deviceEntry, ok := d.devices[deviceKey]
- if !ok {
- return nil, errors.Errorf("no entry found for device key %s", deviceKey)
- }
- return &deviceEntry, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// DeviceEntryDefault is a device entry with default entries
-var DeviceEntryDefault = DeviceEntry{
- DeviceIdentifier: nil,
- MaximumApduLengthAccepted: &defaultMaxApduLength,
- MaximumNpduLength: nil, //note as we are ip we don't care about this
- SegmentationSupported: readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
- MaxSegmentsAccepted: &defaultMaxSegmentsAccepted,
- APDUSegmentTimeout: 5000,
- APDUTimeout: 3000,
- NumberOfAPDURetries: 3,
-}
-
-// TODO: switch that to a pointer and all entries that might be missing too
-type DeviceEntry struct {
- DeviceIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
- MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
- MaximumNpduLength *uint
- SegmentationSupported readWriteModel.BACnetSegmentation
- MaxSegmentsAccepted *readWriteModel.MaxSegmentsAccepted
- APDUSegmentTimeout uint
- APDUTimeout uint
- NumberOfAPDURetries uint
- VendorId readWriteModel.BACnetVendorId
- DeviceObjects []DeviceObject
-}
-
-func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
- var deviceObjects []DeviceObject
- for _, object := range d.DeviceObjects {
- shouldBeAdded := true
- for _, objectFilter := range filter {
- shouldBeAdded = shouldBeAdded && objectFilter(object)
- }
- if shouldBeAdded {
- deviceObjects = append(deviceObjects, object)
- }
- }
- return deviceObjects
-}
-
-type DeviceObjectFilter func(DeviceObject) bool
-
-type DeviceObject struct {
- ObjectName string
- ObjectIdentifier readWriteModel.BACnetTagPayloadObjectIdentifier
- CachedObjectValue interface{}
- TimeOfCache time.Time
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index a37aca13d7..330939d8bb 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -26,6 +26,7 @@ import (
"net"
"net/url"
"strconv"
+ "sync"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -41,15 +42,16 @@ import (
type Driver struct {
_default.DefaultDriver
+ applicationManager ApplicationManager
tm spi.RequestTransactionManager
awaitSetupComplete bool
awaitDisconnectComplete bool
- DeviceInventory DeviceInventory
}
func NewDriver() plc4go.PlcDriver {
return &Driver{
DefaultDriver: _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewTagHandler()),
+ applicationManager: ApplicationManager{},
tm: *spi.NewRequestTransactionManager(math.MaxInt),
awaitSetupComplete: true,
awaitDisconnectComplete: true,
@@ -87,44 +89,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
return ch
}
- var localAddr *net.UDPAddr
- {
- host := transportUrl.Host
- port := transportUrl.Port()
- if transportUrl.Port() == "" {
- port = options["defaultUdpPort"][0]
- }
- var remoteAddr *net.UDPAddr
- if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
- panic(err)
- } else {
- remoteAddr = resolvedRemoteAddr
- }
- if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msg("host unreachable")
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host))
- }()
- return ch
- } else {
- localAddr = dial.LocalAddr().(*net.UDPAddr)
- localAddr.Port, _ = strconv.Atoi(port)
- _ = dial.Close()
- }
- }
- // Have the transport create a new transport-instance.
- transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddr)
- if err != nil {
- log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"])
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %v", transportUrl))
- }()
- return ch
- }
-
- codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
+ codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options)
log.Debug().Msgf("working with codec %#v", codec)
// Create the new connection
@@ -144,3 +109,44 @@ func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem), discov
func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
}
+
+type ApplicationManager struct {
+ sync.Mutex
+ applications map[string]spi.MessageCodec
+}
+
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+ var localAddress *net.UDPAddr
+ {
+ host := transportUrl.Host
+ port := transportUrl.Port()
+ if transportUrl.Port() == "" {
+ port = options["defaultUdpPort"][0]
+ }
+ var remoteAddr *net.UDPAddr
+ if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
+ panic(err)
+ } else {
+ remoteAddr = resolvedRemoteAddr
+ }
+ if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
+ return nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host)
+ } else {
+ localAddress = dial.LocalAddr().(*net.UDPAddr)
+ localAddress.Port, _ = strconv.Atoi(port)
+ _ = dial.Close()
+ }
+ }
+ a.Lock()
+ defer a.Unlock()
+ messageCodec, ok := a.applications[localAddress.String()]
+ if !ok {
+ newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating application layer code")
+ }
+ a.applications[localAddress.String()] = newMessageCodec
+ return newMessageCodec, nil
+ }
+ return messageCodec, nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 528884a7e8..670cc00ddc 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -20,23 +20,80 @@
package bacnetip
import (
+ "context"
"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
"github.com/apache/plc4x/plc4go/spi"
"github.com/apache/plc4x/plc4go/spi/default"
"github.com/apache/plc4x/plc4go/spi/transports"
+ "github.com/apache/plc4x/plc4go/spi/transports/udp"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
+ "net"
+ "net/url"
+ "time"
)
// ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
type ApplicationLayerMessageCodec struct {
- TransactionStateMachine
+ bipSimpleApplication *BIPSimpleApplication
+ messageCode *MessageCodec
+ deviceInfoCache DeviceInfoCache
}
-func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
- return &ApplicationLayerMessageCodec{
- NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory),
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+ // Have the transport create a new transport-instance.
+ transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
+ if err != nil {
+ return nil, errors.Wrap(err, "error creating transport instance")
+ }
+ _ = transportInstance
+ a := &ApplicationLayerMessageCodec{}
+ application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+ if err != nil {
+ return nil, err
}
+ a.bipSimpleApplication = application
+ a.messageCode = NewMessageCodec(transportInstance)
+ return a, nil
+}
+
+func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec {
+ return m
+}
+
+func (m *ApplicationLayerMessageCodec) Connect() error {
+ return m.messageCode.Connect()
+}
+
+func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error {
+ return m.messageCode.ConnectWithContext(ctx)
+}
+
+func (m *ApplicationLayerMessageCodec) Disconnect() error {
+ if err := m.bipSimpleApplication.Close(); err != nil {
+ log.Error().Err(err).Msg("error closing application")
+ }
+ return m.messageCode.Disconnect()
+}
+
+func (m *ApplicationLayerMessageCodec) IsRunning() bool {
+ return m.messageCode.IsRunning()
+}
+
+func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
+ panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+ panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+ panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
+ return m.messageCode.GetDefaultIncomingMessageChannel()
}
type MessageCodec struct {
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
deleted file mode 100644
index 2298b8f231..0000000000
--- a/plc4go/internal/bacnetip/TransactionStateMachine.go
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
- "context"
- readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
- "github.com/apache/plc4x/plc4go/spi"
- "github.com/pkg/errors"
- "github.com/rs/zerolog/log"
- "time"
-)
-
-// TransactionStateMachine is the implementation of the bacnet transaction state machine
-type TransactionStateMachine struct {
- *MessageCodec
- deviceInventory *DeviceInventory
- retryCount int
- segmentRetryCount int
- duplicateCount int
- sentAllSegments bool
- lastSequenceNumber int
- initialSequenceNumber int
- actualWindowSize int
- proposeWindowSize int
- segmentTimer int
- RequestTimer int
-}
-
-func NewTransactionStateMachine(messageCodec *MessageCodec, deviceInventory *DeviceInventory) TransactionStateMachine {
- return TransactionStateMachine{
- MessageCodec: messageCodec,
- deviceInventory: deviceInventory,
- retryCount: 3,
- segmentRetryCount: 3,
- duplicateCount: 0,
- sentAllSegments: false,
- lastSequenceNumber: 0,
- initialSequenceNumber: 0,
- actualWindowSize: 0,
- proposeWindowSize: 2,
- segmentTimer: 1500,
- RequestTimer: 3000,
- }
-}
-
-func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
- return t
-}
-
-func (t *TransactionStateMachine) Send(message spi.Message) error {
- if handled, err := t.handleOutboundMessage(message); handled {
- return nil
- } else if err != nil {
- return errors.Wrap(err, "Error handling message")
- } else {
- return t.MessageCodec.Send(message)
- }
-}
-
-func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
- // TODO: detect overflow
- return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
- // Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
- if err := ctx.Err(); err != nil {
- return errors.Wrap(err, "Not sending message as context is aborted")
- }
- log.Trace().Msg("Sending request")
- // Send the actual message
- err := t.Send(message)
- if err != nil {
- return errors.Wrap(err, "Error sending the request")
- }
- return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
- switch message := message.(type) {
- case readWriteModel.BVLCExactly:
- bvlc := message
- var npdu readWriteModel.NPDU
- if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
- npdu = npduRetriever.GetNpdu()
- } else {
- log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
- return false, nil
- }
- if npdu.GetControl().GetMessageTypeFieldPresent() {
- log.Trace().Msg("Message type field present")
- return false, nil
- }
- var entryForDestination = DeviceEntryDefault
- if npdu.GetControl().GetDestinationSpecified() {
- if retrievedEntry, err := t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
- // Get information from the device first
- // TODO: get information with who-has maybe or directed... not sure now
- // TODO: set entry once received
- _ = retrievedEntry
- }
- }
- // TODO: should we continue if we don't have a destination
- _ = entryForDestination
- apdu := npdu.GetApdu()
- switch apdu := apdu.(type) {
- case readWriteModel.APDUConfirmedRequestExactly:
- // TODO: this is a "client" request
- // TODO: check if adpu length is the magic number (it should be "unencoded")
- return false, nil
- case readWriteModel.APDUComplexAckExactly:
- // TODO: this is a "server" response
- // TODO: check if adpu length is the magic number (it should be "unencoded")
- return false, nil
- default:
- log.Trace().Msgf("APDU type not relevant %T present", apdu)
- return false, nil
- }
- default:
- log.Trace().Msgf("Message type not relevant %T present", message)
- return false, nil
- }
-}