You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/15 15:28:13 UTC

[plc4x] 01/02: refactor(plc4go/bacnet): restructure code to hook in application layer

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c5639a65fddbfa6f159649330ebe0cf31742e928
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Nov 14 17:03:26 2022 +0100

    refactor(plc4go/bacnet): restructure code to hook in application layer
---
 plc4go/internal/bacnetip/ApplicationModule.go      |   3 +-
 plc4go/internal/bacnetip/Device.go                 |  25 +++-
 plc4go/internal/bacnetip/DeviceInventory.go        |  96 --------------
 plc4go/internal/bacnetip/Driver.go                 |  84 ++++++------
 plc4go/internal/bacnetip/MessageCodec.go           |  65 +++++++++-
 .../internal/bacnetip/TransactionStateMachine.go   | 141 ---------------------
 6 files changed, 127 insertions(+), 287 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 0ae673addd..41d0107507 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -27,6 +27,7 @@ import (
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"hash/fnv"
+	"net"
 )
 
 type DeviceInfo struct {
@@ -232,7 +233,7 @@ type BIPSimpleApplication struct {
 	mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
 	b := &BIPSimpleApplication{}
 	var err error
 	b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
index cf0e825ddd..33c36205e6 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -29,6 +29,19 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
 	return nil, nil
 }
 
+var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
+
+// _LocalDeviceObjectDefault is a device entry with default entries
+var _LocalDeviceObjectDefault = LocalDeviceObject{
+	MaximumApduLengthAccepted: &defaultMaxApduLength,
+	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
+	APDUSegmentTimeout:        5000,
+	APDUTimeout:               3000,
+	NumberOfAPDURetries:       3,
+}
+
 type LocalDeviceObject struct {
 	NumberOfAPDURetries       uint
 	APDUTimeout               uint
@@ -40,11 +53,11 @@ type LocalDeviceObject struct {
 
 func NewLocalDeviceObject() *LocalDeviceObject {
 	return &LocalDeviceObject{
-		NumberOfAPDURetries:       0,
-		APDUTimeout:               0,
-		SegmentationSupported:     0,
-		APDUSegmentTimeout:        0,
-		MaxSegmentsAccepted:       nil,
-		MaximumApduLengthAccepted: nil,
+		NumberOfAPDURetries:       _LocalDeviceObjectDefault.NumberOfAPDURetries,
+		APDUTimeout:               _LocalDeviceObjectDefault.APDUTimeout,
+		SegmentationSupported:     _LocalDeviceObjectDefault.SegmentationSupported,
+		APDUSegmentTimeout:        _LocalDeviceObjectDefault.APDUSegmentTimeout,
+		MaxSegmentsAccepted:       _LocalDeviceObjectDefault.MaxSegmentsAccepted,
+		MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
 	}
 }
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
deleted file mode 100644
index 86f3f5c690..0000000000
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
-	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-	"github.com/pkg/errors"
-	"sync"
-	"time"
-)
-
-// TODO: migrate into device info cache
-type DeviceInventory struct {
-	sync.RWMutex
-	devices map[string]DeviceEntry
-}
-
-func (d *DeviceInventory) getEntryForDestination(destination []uint8) (*DeviceEntry, error) {
-	d.RLock()
-	defer d.RUnlock()
-	deviceKey := string(destination)
-	deviceEntry, ok := d.devices[deviceKey]
-	if !ok {
-		return nil, errors.Errorf("no entry found for device key %s", deviceKey)
-	}
-	return &deviceEntry, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// DeviceEntryDefault is a device entry with default entries
-var DeviceEntryDefault = DeviceEntry{
-	DeviceIdentifier:          nil,
-	MaximumApduLengthAccepted: &defaultMaxApduLength,
-	MaximumNpduLength:         nil, //note as we are ip we don't care about this
-	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
-	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
-	APDUSegmentTimeout:        5000,
-	APDUTimeout:               3000,
-	NumberOfAPDURetries:       3,
-}
-
-// TODO: switch that to a pointer and all entries that might be missing too
-type DeviceEntry struct {
-	DeviceIdentifier          readWriteModel.BACnetTagPayloadObjectIdentifier
-	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
-	MaximumNpduLength         *uint
-	SegmentationSupported     readWriteModel.BACnetSegmentation
-	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
-	APDUSegmentTimeout        uint
-	APDUTimeout               uint
-	NumberOfAPDURetries       uint
-	VendorId                  readWriteModel.BACnetVendorId
-	DeviceObjects             []DeviceObject
-}
-
-func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
-	var deviceObjects []DeviceObject
-	for _, object := range d.DeviceObjects {
-		shouldBeAdded := true
-		for _, objectFilter := range filter {
-			shouldBeAdded = shouldBeAdded && objectFilter(object)
-		}
-		if shouldBeAdded {
-			deviceObjects = append(deviceObjects, object)
-		}
-	}
-	return deviceObjects
-}
-
-type DeviceObjectFilter func(DeviceObject) bool
-
-type DeviceObject struct {
-	ObjectName        string
-	ObjectIdentifier  readWriteModel.BACnetTagPayloadObjectIdentifier
-	CachedObjectValue interface{}
-	TimeOfCache       time.Time
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index a37aca13d7..330939d8bb 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -26,6 +26,7 @@ import (
 	"net"
 	"net/url"
 	"strconv"
+	"sync"
 
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -41,15 +42,16 @@ import (
 
 type Driver struct {
 	_default.DefaultDriver
+	applicationManager      ApplicationManager
 	tm                      spi.RequestTransactionManager
 	awaitSetupComplete      bool
 	awaitDisconnectComplete bool
-	DeviceInventory         DeviceInventory
 }
 
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewTagHandler()),
+		applicationManager:      ApplicationManager{},
 		tm:                      *spi.NewRequestTransactionManager(math.MaxInt),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
@@ -87,44 +89,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
 		return ch
 	}
 
-	var localAddr *net.UDPAddr
-	{
-		host := transportUrl.Host
-		port := transportUrl.Port()
-		if transportUrl.Port() == "" {
-			port = options["defaultUdpPort"][0]
-		}
-		var remoteAddr *net.UDPAddr
-		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
-			panic(err)
-		} else {
-			remoteAddr = resolvedRemoteAddr
-		}
-		if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
-			log.Error().Stringer("transportUrl", &transportUrl).Msg("host unreachable")
-			ch := make(chan plc4go.PlcConnectionConnectResult)
-			go func() {
-				ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host))
-			}()
-			return ch
-		} else {
-			localAddr = dial.LocalAddr().(*net.UDPAddr)
-			localAddr.Port, _ = strconv.Atoi(port)
-			_ = dial.Close()
-		}
-	}
-	// Have the transport create a new transport-instance.
-	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddr)
-	if err != nil {
-		log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"])
-		ch := make(chan plc4go.PlcConnectionConnectResult)
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %v", transportUrl))
-		}()
-		return ch
-	}
-
-	codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
+	codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options)
 	log.Debug().Msgf("working with codec %#v", codec)
 
 	// Create the new connection
@@ -144,3 +109,44 @@ func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem), discov
 func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
 	return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
 }
+
+type ApplicationManager struct {
+	sync.Mutex
+	applications map[string]spi.MessageCodec
+}
+
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+	var localAddress *net.UDPAddr
+	{
+		host := transportUrl.Host
+		port := transportUrl.Port()
+		if transportUrl.Port() == "" {
+			port = options["defaultUdpPort"][0]
+		}
+		var remoteAddr *net.UDPAddr
+		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
+			panic(err)
+		} else {
+			remoteAddr = resolvedRemoteAddr
+		}
+		if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
+			return nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host)
+		} else {
+			localAddress = dial.LocalAddr().(*net.UDPAddr)
+			localAddress.Port, _ = strconv.Atoi(port)
+			_ = dial.Close()
+		}
+	}
+	a.Lock()
+	defer a.Unlock()
+	messageCodec, ok := a.applications[localAddress.String()]
+	if !ok {
+		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+		if err != nil {
+			return nil, errors.Wrap(err, "error creating application layer code")
+		}
+		a.applications[localAddress.String()] = newMessageCodec
+		return newMessageCodec, nil
+	}
+	return messageCodec, nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 528884a7e8..670cc00ddc 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -20,23 +20,80 @@
 package bacnetip
 
 import (
+	"context"
 	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/spi/transports/udp"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
+	"net"
+	"net/url"
+	"time"
 )
 
 // ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
 type ApplicationLayerMessageCodec struct {
-	TransactionStateMachine
+	bipSimpleApplication *BIPSimpleApplication
+	messageCode          *MessageCodec
+	deviceInfoCache      DeviceInfoCache
 }
 
-func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
-	return &ApplicationLayerMessageCodec{
-		NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory),
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+	// Have the transport create a new transport-instance.
+	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating transport instance")
+	}
+	_ = transportInstance
+	a := &ApplicationLayerMessageCodec{}
+	application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+	if err != nil {
+		return nil, err
 	}
+	a.bipSimpleApplication = application
+	a.messageCode = NewMessageCodec(transportInstance)
+	return a, nil
+}
+
+func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec {
+	return m
+}
+
+func (m *ApplicationLayerMessageCodec) Connect() error {
+	return m.messageCode.Connect()
+}
+
+func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error {
+	return m.messageCode.ConnectWithContext(ctx)
+}
+
+func (m *ApplicationLayerMessageCodec) Disconnect() error {
+	if err := m.bipSimpleApplication.Close(); err != nil {
+		log.Error().Err(err).Msg("error closing application")
+	}
+	return m.messageCode.Disconnect()
+}
+
+func (m *ApplicationLayerMessageCodec) IsRunning() bool {
+	return m.messageCode.IsRunning()
+}
+
+func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
+	return m.messageCode.GetDefaultIncomingMessageChannel()
 }
 
 type MessageCodec struct {
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
deleted file mode 100644
index 2298b8f231..0000000000
--- a/plc4go/internal/bacnetip/TransactionStateMachine.go
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
-	"context"
-	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-	"github.com/apache/plc4x/plc4go/spi"
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog/log"
-	"time"
-)
-
-// TransactionStateMachine is the implementation of the bacnet transaction state machine
-type TransactionStateMachine struct {
-	*MessageCodec
-	deviceInventory       *DeviceInventory
-	retryCount            int
-	segmentRetryCount     int
-	duplicateCount        int
-	sentAllSegments       bool
-	lastSequenceNumber    int
-	initialSequenceNumber int
-	actualWindowSize      int
-	proposeWindowSize     int
-	segmentTimer          int
-	RequestTimer          int
-}
-
-func NewTransactionStateMachine(messageCodec *MessageCodec, deviceInventory *DeviceInventory) TransactionStateMachine {
-	return TransactionStateMachine{
-		MessageCodec:          messageCodec,
-		deviceInventory:       deviceInventory,
-		retryCount:            3,
-		segmentRetryCount:     3,
-		duplicateCount:        0,
-		sentAllSegments:       false,
-		lastSequenceNumber:    0,
-		initialSequenceNumber: 0,
-		actualWindowSize:      0,
-		proposeWindowSize:     2,
-		segmentTimer:          1500,
-		RequestTimer:          3000,
-	}
-}
-
-func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
-	return t
-}
-
-func (t *TransactionStateMachine) Send(message spi.Message) error {
-	if handled, err := t.handleOutboundMessage(message); handled {
-		return nil
-	} else if err != nil {
-		return errors.Wrap(err, "Error handling message")
-	} else {
-		return t.MessageCodec.Send(message)
-	}
-}
-
-func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
-	// TODO: detect overflow
-	return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
-	// Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
-	if err := ctx.Err(); err != nil {
-		return errors.Wrap(err, "Not sending message as context is aborted")
-	}
-	log.Trace().Msg("Sending request")
-	// Send the actual message
-	err := t.Send(message)
-	if err != nil {
-		return errors.Wrap(err, "Error sending the request")
-	}
-	return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
-	switch message := message.(type) {
-	case readWriteModel.BVLCExactly:
-		bvlc := message
-		var npdu readWriteModel.NPDU
-		if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
-			npdu = npduRetriever.GetNpdu()
-		} else {
-			log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
-			return false, nil
-		}
-		if npdu.GetControl().GetMessageTypeFieldPresent() {
-			log.Trace().Msg("Message type field present")
-			return false, nil
-		}
-		var entryForDestination = DeviceEntryDefault
-		if npdu.GetControl().GetDestinationSpecified() {
-			if retrievedEntry, err := t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
-				// Get information from the device first
-				// TODO: get information with who-has maybe or directed... not sure now
-				// TODO: set entry once received
-				_ = retrievedEntry
-			}
-		}
-		// TODO: should we continue if we don't have a destination
-		_ = entryForDestination
-		apdu := npdu.GetApdu()
-		switch apdu := apdu.(type) {
-		case readWriteModel.APDUConfirmedRequestExactly:
-			// TODO: this is a "client" request
-			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			return false, nil
-		case readWriteModel.APDUComplexAckExactly:
-			// TODO: this is a "server" response
-			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			return false, nil
-		default:
-			log.Trace().Msgf("APDU type not relevant %T present", apdu)
-			return false, nil
-		}
-	default:
-		log.Trace().Msgf("Message type not relevant %T present", message)
-		return false, nil
-	}
-}