You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/15 15:28:12 UTC

[plc4x] branch develop updated (a331d50ab7 -> 25c5e94163)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from a331d50ab7 build(deps): bump mockito.version from 4.8.1 to 4.9.0 (#654)
     new c5639a65fd refactor(plc4go/bacnet): restructure code to hook in application layer
     new 25c5e94163 refactor(plc4go/bacnet): added more application code for protocol

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/ApplicationLayer.go       | 137 ++---
 plc4go/internal/bacnetip/ApplicationModule.go      | 220 +++++++-
 plc4go/internal/bacnetip/DeviceInventory.go        |  96 ----
 plc4go/internal/bacnetip/Driver.go                 |  84 +--
 plc4go/internal/bacnetip/IOCBModule.go             | 589 +++++++++++++++++++++
 plc4go/internal/bacnetip/MessageCodec.go           |  87 ++-
 .../internal/bacnetip/TransactionStateMachine.go   | 141 -----
 plc4go/internal/bacnetip/{ => local}/Device.go     |  28 +-
 .../bacnetip/{Object.go => service/Device.go}      |   6 +-
 plc4go/internal/bacnetip/{ => service}/Object.go   |   2 +-
 10 files changed, 1005 insertions(+), 385 deletions(-)
 delete mode 100644 plc4go/internal/bacnetip/DeviceInventory.go
 create mode 100644 plc4go/internal/bacnetip/IOCBModule.go
 delete mode 100644 plc4go/internal/bacnetip/TransactionStateMachine.go
 rename plc4go/internal/bacnetip/{ => local}/Device.go (68%)
 copy plc4go/internal/bacnetip/{Object.go => service/Device.go} (86%)
 copy plc4go/internal/bacnetip/{ => service}/Object.go (98%)


[plc4x] 01/02: refactor(plc4go/bacnet): restructure code to hook in application layer

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c5639a65fddbfa6f159649330ebe0cf31742e928
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Nov 14 17:03:26 2022 +0100

    refactor(plc4go/bacnet): restructure code to hook in application layer
---
 plc4go/internal/bacnetip/ApplicationModule.go      |   3 +-
 plc4go/internal/bacnetip/Device.go                 |  25 +++-
 plc4go/internal/bacnetip/DeviceInventory.go        |  96 --------------
 plc4go/internal/bacnetip/Driver.go                 |  84 ++++++------
 plc4go/internal/bacnetip/MessageCodec.go           |  65 +++++++++-
 .../internal/bacnetip/TransactionStateMachine.go   | 141 ---------------------
 6 files changed, 127 insertions(+), 287 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 0ae673addd..41d0107507 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -27,6 +27,7 @@ import (
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"hash/fnv"
+	"net"
 )
 
 type DeviceInfo struct {
@@ -232,7 +233,7 @@ type BIPSimpleApplication struct {
 	mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
 	b := &BIPSimpleApplication{}
 	var err error
 	b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
index cf0e825ddd..33c36205e6 100644
--- a/plc4go/internal/bacnetip/Device.go
+++ b/plc4go/internal/bacnetip/Device.go
@@ -29,6 +29,19 @@ func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
 	return nil, nil
 }
 
+var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
+var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
+
+// _LocalDeviceObjectDefault is a device entry with default entries
+var _LocalDeviceObjectDefault = LocalDeviceObject{
+	MaximumApduLengthAccepted: &defaultMaxApduLength,
+	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
+	APDUSegmentTimeout:        5000,
+	APDUTimeout:               3000,
+	NumberOfAPDURetries:       3,
+}
+
 type LocalDeviceObject struct {
 	NumberOfAPDURetries       uint
 	APDUTimeout               uint
@@ -40,11 +53,11 @@ type LocalDeviceObject struct {
 
 func NewLocalDeviceObject() *LocalDeviceObject {
 	return &LocalDeviceObject{
-		NumberOfAPDURetries:       0,
-		APDUTimeout:               0,
-		SegmentationSupported:     0,
-		APDUSegmentTimeout:        0,
-		MaxSegmentsAccepted:       nil,
-		MaximumApduLengthAccepted: nil,
+		NumberOfAPDURetries:       _LocalDeviceObjectDefault.NumberOfAPDURetries,
+		APDUTimeout:               _LocalDeviceObjectDefault.APDUTimeout,
+		SegmentationSupported:     _LocalDeviceObjectDefault.SegmentationSupported,
+		APDUSegmentTimeout:        _LocalDeviceObjectDefault.APDUSegmentTimeout,
+		MaxSegmentsAccepted:       _LocalDeviceObjectDefault.MaxSegmentsAccepted,
+		MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
 	}
 }
diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
deleted file mode 100644
index 86f3f5c690..0000000000
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
-	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-	"github.com/pkg/errors"
-	"sync"
-	"time"
-)
-
-// TODO: migrate into device info cache
-type DeviceInventory struct {
-	sync.RWMutex
-	devices map[string]DeviceEntry
-}
-
-func (d *DeviceInventory) getEntryForDestination(destination []uint8) (*DeviceEntry, error) {
-	d.RLock()
-	defer d.RUnlock()
-	deviceKey := string(destination)
-	deviceEntry, ok := d.devices[deviceKey]
-	if !ok {
-		return nil, errors.Errorf("no entry found for device key %s", deviceKey)
-	}
-	return &deviceEntry, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// DeviceEntryDefault is a device entry with default entries
-var DeviceEntryDefault = DeviceEntry{
-	DeviceIdentifier:          nil,
-	MaximumApduLengthAccepted: &defaultMaxApduLength,
-	MaximumNpduLength:         nil, //note as we are ip we don't care about this
-	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
-	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
-	APDUSegmentTimeout:        5000,
-	APDUTimeout:               3000,
-	NumberOfAPDURetries:       3,
-}
-
-// TODO: switch that to a pointer and all entries that might be missing too
-type DeviceEntry struct {
-	DeviceIdentifier          readWriteModel.BACnetTagPayloadObjectIdentifier
-	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
-	MaximumNpduLength         *uint
-	SegmentationSupported     readWriteModel.BACnetSegmentation
-	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
-	APDUSegmentTimeout        uint
-	APDUTimeout               uint
-	NumberOfAPDURetries       uint
-	VendorId                  readWriteModel.BACnetVendorId
-	DeviceObjects             []DeviceObject
-}
-
-func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
-	var deviceObjects []DeviceObject
-	for _, object := range d.DeviceObjects {
-		shouldBeAdded := true
-		for _, objectFilter := range filter {
-			shouldBeAdded = shouldBeAdded && objectFilter(object)
-		}
-		if shouldBeAdded {
-			deviceObjects = append(deviceObjects, object)
-		}
-	}
-	return deviceObjects
-}
-
-type DeviceObjectFilter func(DeviceObject) bool
-
-type DeviceObject struct {
-	ObjectName        string
-	ObjectIdentifier  readWriteModel.BACnetTagPayloadObjectIdentifier
-	CachedObjectValue interface{}
-	TimeOfCache       time.Time
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index a37aca13d7..330939d8bb 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -26,6 +26,7 @@ import (
 	"net"
 	"net/url"
 	"strconv"
+	"sync"
 
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -41,15 +42,16 @@ import (
 
 type Driver struct {
 	_default.DefaultDriver
+	applicationManager      ApplicationManager
 	tm                      spi.RequestTransactionManager
 	awaitSetupComplete      bool
 	awaitDisconnectComplete bool
-	DeviceInventory         DeviceInventory
 }
 
 func NewDriver() plc4go.PlcDriver {
 	return &Driver{
 		DefaultDriver:           _default.NewDefaultDriver("bacnet-ip", "BACnet/IP", "udp", NewTagHandler()),
+		applicationManager:      ApplicationManager{},
 		tm:                      *spi.NewRequestTransactionManager(math.MaxInt),
 		awaitSetupComplete:      true,
 		awaitDisconnectComplete: true,
@@ -87,44 +89,7 @@ func (m *Driver) GetConnection(transportUrl url.URL, transports map[string]trans
 		return ch
 	}
 
-	var localAddr *net.UDPAddr
-	{
-		host := transportUrl.Host
-		port := transportUrl.Port()
-		if transportUrl.Port() == "" {
-			port = options["defaultUdpPort"][0]
-		}
-		var remoteAddr *net.UDPAddr
-		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
-			panic(err)
-		} else {
-			remoteAddr = resolvedRemoteAddr
-		}
-		if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
-			log.Error().Stringer("transportUrl", &transportUrl).Msg("host unreachable")
-			ch := make(chan plc4go.PlcConnectionConnectResult)
-			go func() {
-				ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host))
-			}()
-			return ch
-		} else {
-			localAddr = dial.LocalAddr().(*net.UDPAddr)
-			localAddr.Port, _ = strconv.Atoi(port)
-			_ = dial.Close()
-		}
-	}
-	// Have the transport create a new transport-instance.
-	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddr)
-	if err != nil {
-		log.Error().Stringer("transportUrl", &transportUrl).Msgf("We couldn't create a transport instance for port %#v", options["defaultUdpPort"])
-		ch := make(chan plc4go.PlcConnectionConnectResult)
-		go func() {
-			ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Errorf("couldn't initialize transport configuration for given transport url %v", transportUrl))
-		}()
-		return ch
-	}
-
-	codec := NewApplicationLayerMessageCodec(transportInstance, &m.DeviceInventory)
+	codec, _ := m.applicationManager.getApplicationLayerMessageCode(udpTransport, transportUrl, options)
 	log.Debug().Msgf("working with codec %#v", codec)
 
 	// Create the new connection
@@ -144,3 +109,44 @@ func (m *Driver) Discover(callback func(event apiModel.PlcDiscoveryItem), discov
 func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event apiModel.PlcDiscoveryItem), discoveryOptions ...options.WithDiscoveryOption) error {
 	return NewDiscoverer().Discover(ctx, callback, discoveryOptions...)
 }
+
+type ApplicationManager struct {
+	sync.Mutex
+	applications map[string]spi.MessageCodec
+}
+
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+	var localAddress *net.UDPAddr
+	{
+		host := transportUrl.Host
+		port := transportUrl.Port()
+		if transportUrl.Port() == "" {
+			port = options["defaultUdpPort"][0]
+		}
+		var remoteAddr *net.UDPAddr
+		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
+			panic(err)
+		} else {
+			remoteAddr = resolvedRemoteAddr
+		}
+		if dial, err := net.DialUDP("udp", nil, remoteAddr); err != nil {
+			return nil, errors.Errorf("couldn't dial to host %#v", transportUrl.Host)
+		} else {
+			localAddress = dial.LocalAddr().(*net.UDPAddr)
+			localAddress.Port, _ = strconv.Atoi(port)
+			_ = dial.Close()
+		}
+	}
+	a.Lock()
+	defer a.Unlock()
+	messageCodec, ok := a.applications[localAddress.String()]
+	if !ok {
+		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+		if err != nil {
+			return nil, errors.Wrap(err, "error creating application layer code")
+		}
+		a.applications[localAddress.String()] = newMessageCodec
+		return newMessageCodec, nil
+	}
+	return messageCodec, nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 528884a7e8..670cc00ddc 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -20,23 +20,80 @@
 package bacnetip
 
 import (
+	"context"
 	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
 	"github.com/apache/plc4x/plc4go/spi/transports"
+	"github.com/apache/plc4x/plc4go/spi/transports/udp"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
+	"net"
+	"net/url"
+	"time"
 )
 
 // ApplicationLayerMessageCodec is a wrapper for MessageCodec which takes care of segmentation, retries etc.
 type ApplicationLayerMessageCodec struct {
-	TransactionStateMachine
+	bipSimpleApplication *BIPSimpleApplication
+	messageCode          *MessageCodec
+	deviceInfoCache      DeviceInfoCache
 }
 
-func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
-	return &ApplicationLayerMessageCodec{
-		NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory),
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+	// Have the transport create a new transport-instance.
+	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating transport instance")
+	}
+	_ = transportInstance
+	a := &ApplicationLayerMessageCodec{}
+	application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+	if err != nil {
+		return nil, err
 	}
+	a.bipSimpleApplication = application
+	a.messageCode = NewMessageCodec(transportInstance)
+	return a, nil
+}
+
+func (m *ApplicationLayerMessageCodec) GetCodec() spi.MessageCodec {
+	return m
+}
+
+func (m *ApplicationLayerMessageCodec) Connect() error {
+	return m.messageCode.Connect()
+}
+
+func (m *ApplicationLayerMessageCodec) ConnectWithContext(ctx context.Context) error {
+	return m.messageCode.ConnectWithContext(ctx)
+}
+
+func (m *ApplicationLayerMessageCodec) Disconnect() error {
+	if err := m.bipSimpleApplication.Close(); err != nil {
+		log.Error().Err(err).Msg("error closing application")
+	}
+	return m.messageCode.Disconnect()
+}
+
+func (m *ApplicationLayerMessageCodec) IsRunning() bool {
+	return m.messageCode.IsRunning()
+}
+
+func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	panic("not yet mapped")
+}
+
+func (m *ApplicationLayerMessageCodec) GetDefaultIncomingMessageChannel() chan spi.Message {
+	return m.messageCode.GetDefaultIncomingMessageChannel()
 }
 
 type MessageCodec struct {
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
deleted file mode 100644
index 2298b8f231..0000000000
--- a/plc4go/internal/bacnetip/TransactionStateMachine.go
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import (
-	"context"
-	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-	"github.com/apache/plc4x/plc4go/spi"
-	"github.com/pkg/errors"
-	"github.com/rs/zerolog/log"
-	"time"
-)
-
-// TransactionStateMachine is the implementation of the bacnet transaction state machine
-type TransactionStateMachine struct {
-	*MessageCodec
-	deviceInventory       *DeviceInventory
-	retryCount            int
-	segmentRetryCount     int
-	duplicateCount        int
-	sentAllSegments       bool
-	lastSequenceNumber    int
-	initialSequenceNumber int
-	actualWindowSize      int
-	proposeWindowSize     int
-	segmentTimer          int
-	RequestTimer          int
-}
-
-func NewTransactionStateMachine(messageCodec *MessageCodec, deviceInventory *DeviceInventory) TransactionStateMachine {
-	return TransactionStateMachine{
-		MessageCodec:          messageCodec,
-		deviceInventory:       deviceInventory,
-		retryCount:            3,
-		segmentRetryCount:     3,
-		duplicateCount:        0,
-		sentAllSegments:       false,
-		lastSequenceNumber:    0,
-		initialSequenceNumber: 0,
-		actualWindowSize:      0,
-		proposeWindowSize:     2,
-		segmentTimer:          1500,
-		RequestTimer:          3000,
-	}
-}
-
-func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
-	return t
-}
-
-func (t *TransactionStateMachine) Send(message spi.Message) error {
-	if handled, err := t.handleOutboundMessage(message); handled {
-		return nil
-	} else if err != nil {
-		return errors.Wrap(err, "Error handling message")
-	} else {
-		return t.MessageCodec.Send(message)
-	}
-}
-
-func (t *TransactionStateMachine) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
-	// TODO: detect overflow
-	return t.MessageCodec.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
-	// Note: this code is copied on purpose from default codec as we want to call "this" `Send` and `Expect`
-	if err := ctx.Err(); err != nil {
-		return errors.Wrap(err, "Not sending message as context is aborted")
-	}
-	log.Trace().Msg("Sending request")
-	// Send the actual message
-	err := t.Send(message)
-	if err != nil {
-		return errors.Wrap(err, "Error sending the request")
-	}
-	return t.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
-}
-
-func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (handled bool, err error) {
-	switch message := message.(type) {
-	case readWriteModel.BVLCExactly:
-		bvlc := message
-		var npdu readWriteModel.NPDU
-		if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
-			npdu = npduRetriever.GetNpdu()
-		} else {
-			log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
-			return false, nil
-		}
-		if npdu.GetControl().GetMessageTypeFieldPresent() {
-			log.Trace().Msg("Message type field present")
-			return false, nil
-		}
-		var entryForDestination = DeviceEntryDefault
-		if npdu.GetControl().GetDestinationSpecified() {
-			if retrievedEntry, err := t.deviceInventory.getEntryForDestination(npdu.GetDestinationAddress()); err != nil {
-				// Get information from the device first
-				// TODO: get information with who-has maybe or directed... not sure now
-				// TODO: set entry once received
-				_ = retrievedEntry
-			}
-		}
-		// TODO: should we continue if we don't have a destination
-		_ = entryForDestination
-		apdu := npdu.GetApdu()
-		switch apdu := apdu.(type) {
-		case readWriteModel.APDUConfirmedRequestExactly:
-			// TODO: this is a "client" request
-			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			return false, nil
-		case readWriteModel.APDUComplexAckExactly:
-			// TODO: this is a "server" response
-			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			return false, nil
-		default:
-			log.Trace().Msgf("APDU type not relevant %T present", apdu)
-			return false, nil
-		}
-	default:
-		log.Trace().Msgf("Message type not relevant %T present", message)
-		return false, nil
-	}
-}


[plc4x] 02/02: refactor(plc4go/bacnet): added more application code for protocol

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 25c5e941636f66d19ae70a0704a67362a24cd51f
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Nov 15 16:28:00 2022 +0100

    refactor(plc4go/bacnet): added more application code for protocol
---
 plc4go/internal/bacnetip/ApplicationLayer.go  | 137 +++---
 plc4go/internal/bacnetip/ApplicationModule.go | 219 +++++++++-
 plc4go/internal/bacnetip/Device.go            |  63 ---
 plc4go/internal/bacnetip/Driver.go            |   8 +-
 plc4go/internal/bacnetip/IOCBModule.go        | 589 ++++++++++++++++++++++++++
 plc4go/internal/bacnetip/MessageCodec.go      |  30 +-
 plc4go/internal/bacnetip/local/Device.go      |  36 ++
 plc4go/internal/bacnetip/service/Device.go    |  28 ++
 plc4go/internal/bacnetip/service/Object.go    |  28 ++
 9 files changed, 987 insertions(+), 151 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index 04dc50f415..d7a5ce94f3 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -21,6 +21,7 @@ package bacnetip
 
 import (
 	"bytes"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
@@ -31,33 +32,33 @@ import (
 type SSMState uint8
 
 const (
-	IDLE SSMState = iota
-	SEGMENTED_REQUEST
-	AWAIT_CONFIRMATION
-	AWAIT_RESPONSE
-	SEGMENTED_RESPONSE
-	SEGMENTED_CONFIRMATION
-	COMPLETED
-	ABORTED
+	SSMState_IDLE SSMState = iota
+	SSMState_SEGMENTED_REQUEST
+	SSMState_AWAIT_CONFIRMATION
+	SSMState_AWAIT_RESPONSE
+	SSMState_SEGMENTED_RESPONSE
+	SSMState_SEGMENTED_CONFIRMATION
+	SSMState_COMPLETED
+	SSMState_ABORTED
 )
 
 func (s SSMState) String() string {
 	switch s {
-	case IDLE:
+	case SSMState_IDLE:
 		return "IDLE"
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return "SEGMENTED_REQUEST"
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return "AWAIT_CONFIRMATION"
-	case AWAIT_RESPONSE:
+	case SSMState_AWAIT_RESPONSE:
 		return "AWAIT_RESPONSE"
-	case SEGMENTED_RESPONSE:
+	case SSMState_SEGMENTED_RESPONSE:
 		return "SEGMENTED_RESPONSE"
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return "SEGMENTED_CONFIRMATION"
-	case COMPLETED:
+	case SSMState_COMPLETED:
 		return "COMPLETED"
-	case ABORTED:
+	case SSMState_ABORTED:
 		return "ABORTED"
 	default:
 		return "Unknown"
@@ -76,7 +77,7 @@ type SSMSAPRequirements interface {
 	_ServiceAccessPoint
 	_Client
 	GetDeviceInfoCache() *DeviceInfoCache
-	GetLocalDevice() LocalDeviceObject
+	GetLocalDevice() *local.LocalDeviceObject
 	GetProposedWindowSize() uint8
 	GetClientTransactions() []*ClientSSM
 	GetServerTransactions() []*ServerSSM
@@ -126,7 +127,7 @@ func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
 		ssmSAP:                sap,
 		pduAddress:            pduAddress,
 		deviceInfo:            deviceInfo,
-		state:                 IDLE,
+		state:                 SSMState_IDLE,
 		numberOfApduRetries:   localDevice.NumberOfAPDURetries,
 		apduTimeout:           localDevice.APDUTimeout,
 		segmentationSupported: localDevice.SegmentationSupported,
@@ -163,7 +164,7 @@ func (s *SSM) restartTimer(millis uint) {
 // setState This function is called when the derived class wants to change state
 func (s *SSM) setState(newState SSMState, timer *uint) error {
 	log.Debug().Msgf("setState %s timer=%d", newState, timer)
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		return errors.Errorf("Invalid state transition from %s to %s", s.state, newState)
 	}
 
@@ -360,7 +361,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
 		return errors.Wrap(err, "error during SSM state transition")
 	}
 
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list
 		if s.deviceInfo == nil {
@@ -458,7 +459,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 		// unsegmented
 		s.sentAllSegments = true
 		s.retryCount = 0
-		if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+		if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 	} else {
@@ -468,7 +469,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 		s.segmentRetryCount = 0
 		s.initialSequenceNumber = 0
 		s.actualWindowSize = nil
-		if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+		if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 	}
@@ -497,11 +498,11 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
 	log.Debug().Msgf("confirmation\n%s", apdu)
 
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequest(apdu)
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitConfirmation(apdu)
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedConfirmation(apdu)
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -512,13 +513,13 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
 func (s *ClientSSM) processTask() error {
 	log.Debug().Msg("processTask")
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequestTimeout()
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitConfirmationTimeout()
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedConfirmationTimeout()
-	case COMPLETED, ABORTED:
+	case SSMState_COMPLETED, SSMState_ABORTED:
 		return nil
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -530,7 +531,7 @@ func (s *ClientSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
 	log.Debug().Msgf("abort\n%s", reason)
 
 	// change the state to aborted
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return nil, errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -558,7 +559,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 		} else if s.sentAllSegments {
 			log.Debug().Msg("all done sending request")
 
-			if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+			if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		} else {
@@ -587,7 +588,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 				log.Debug().Err(err).Msg("error sending response")
 			}
 		} else {
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		}
@@ -607,7 +608,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 			}
 		} else if !apdu.GetSegmentedMessage() {
 			// ack is not segmented
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 			if err := s.Response(apdu); err != nil {
@@ -624,13 +625,13 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 			s.actualWindowSize = &minWindowSize
 			s.lastSequenceNumber = 0
 			s.initialSequenceNumber = 0
-			if err := s.setState(SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
+			if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		}
 	case readWriteModel.APDUErrorExactly:
 		log.Debug().Msg("error/reject/abort")
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -685,7 +686,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUAbortExactly:
 		log.Debug().Msg("Server aborted")
 
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -694,7 +695,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
 		log.Debug().Msg("simple ack, error or reject")
 
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -706,7 +707,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 		if !apdu.GetSegmentedMessage() {
 			log.Debug().Msg("unsegmented")
 
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 			if err := s.Response(apdu); err != nil {
@@ -733,7 +734,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 			s.actualWindowSize = apdu.GetProposedWindowSize()
 			s.lastSequenceNumber = 0
 			s.initialSequenceNumber = 0
-			if err := s.setState(SEGMENTED_CONFIRMATION, nil); err != nil {
+			if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 
@@ -858,7 +859,7 @@ func (s *ClientSSM) segmentedConfirmation(apdu readWriteModel.APDU) error {
 			log.Debug().Err(err).Msg("error sending request")
 		}
 
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		// TODO: this is nonsense... We need to parse the service and the apdu not sure where to get it from now...
@@ -928,7 +929,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
 		return errors.Wrap(err, "error during SSM state transition")
 	}
 
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list
 		if s.deviceInfo != nil {
@@ -954,13 +955,13 @@ func (s *ServerSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 	// make sure we're getting confirmed requests
 
 	switch s.state {
-	case IDLE:
+	case SSMState_IDLE:
 		return s.idle(apdu)
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequest(apdu)
-	case AWAIT_RESPONSE:
+	case SSMState_AWAIT_RESPONSE:
 		return s.awaitResponse(apdu)
-	case SEGMENTED_RESPONSE:
+	case SSMState_SEGMENTED_RESPONSE:
 		return s.segmentedResponse(apdu)
 	default:
 		return errors.Errorf("invalid state %s", s.state)
@@ -984,7 +985,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 	log.Debug().Msgf("confirmation\n%s", apdu)
 
 	// check to see we are in the correct state
-	if s.state != AWAIT_RESPONSE {
+	if s.state != SSMState_AWAIT_RESPONSE {
 		log.Debug().Msg("warning: no expecting a response")
 	}
 
@@ -993,7 +994,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUAbortExactly:
 		log.Debug().Msg("abort")
 
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 
@@ -1004,7 +1005,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 		log.Debug().Msg("simple ack, error or reject")
 
 		// transaction completed
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 
@@ -1086,7 +1087,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 				if err := s.Response(apdu); err != nil {
 					log.Debug().Err(err).Msg("error sending response")
 				}
-				if err := s.setState(COMPLETED, nil); err != nil {
+				if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 					return errors.Wrap(err, "Error setting state to aborted")
 				}
 			} else {
@@ -1097,7 +1098,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 				if err := s.Response(segment); err != nil {
 					log.Debug().Err(err).Msg("error sending response")
 				}
-				if err := s.setState(SEGMENTED_RESPONSE, nil); err != nil {
+				if err := s.setState(SSMState_SEGMENTED_RESPONSE, nil); err != nil {
 					return errors.Wrap(err, "Error setting state to aborted")
 				}
 			}
@@ -1114,13 +1115,13 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 func (s *ServerSSM) processTask() error {
 	log.Debug().Msg("processTask")
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequestTimeout()
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitResponseTimeout()
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedResponseTimeout()
-	case COMPLETED, ABORTED:
+	case SSMState_COMPLETED, SSMState_ABORTED:
 		return nil
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -1132,7 +1133,7 @@ func (s *ServerSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
 	log.Debug().Msgf("abort\n%s", reason)
 
 	// change the state to aborted
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return nil, errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -1200,7 +1201,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 
 	// unsegmented request
 	if len(apduConfirmedRequest.GetSegment()) <= 0 {
-		if err := s.setState(AWAIT_RESPONSE, nil); err != nil {
+		if err := s.setState(SSMState_AWAIT_RESPONSE, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		return s.Request(apdu)
@@ -1228,7 +1229,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 	// initialize the state
 	s.lastSequenceNumber = 0
 	s.initialSequenceNumber = 0
-	if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+	if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
 		return errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -1243,7 +1244,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 
 	// some kind of problem
 	if _, ok := apdu.(readWriteModel.APDUAbortExactly); ok {
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		return s.Response(apdu)
@@ -1312,7 +1313,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 
 		// forward the whole thing to the application
 		applicationTimeout := s.ssmSAP.GetApplicationTimeout()
-		if err := s.setState(AWAIT_RESPONSE, &applicationTimeout); err != nil {
+		if err := s.setState(SSMState_AWAIT_RESPONSE, &applicationTimeout); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		// TODO: here we need to rebuild again yada yada
@@ -1348,7 +1349,7 @@ func (s *ServerSSM) segmentedRequestTimeout() error {
 	log.Debug().Msg("segmentedRequestTimeout")
 
 	// give up
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return errors.Wrap(err, "Error setting state to aborted")
 	}
 	return nil
@@ -1364,7 +1365,7 @@ func (s *ServerSSM) awaitResponse(apdu readWriteModel.APDU) error {
 		log.Debug().Msg("client aborting this request")
 
 		// forward to the application
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		if err := s.Request(apdu); err != nil { // send it ot the device
@@ -1410,7 +1411,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
 		} else if s.sentAllSegments {
 			// final ack received?
 			log.Debug().Msg("all done sending response")
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "Error setting state to aborted")
 			}
 		} else {
@@ -1427,7 +1428,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
 		}
 	// some kind of problem
 	case readWriteModel.APDUAbortExactly:
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		if err := s.Response(apdu); err != nil { // send it ot the application
@@ -1451,7 +1452,7 @@ func (s *ServerSSM) segmentedResponseTimeout() error {
 		}
 	} else {
 		// five up
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 	}
@@ -1462,7 +1463,7 @@ type StateMachineAccessPoint struct {
 	*Client
 	*ServiceAccessPoint
 
-	localDevice           LocalDeviceObject
+	localDevice           *local.LocalDeviceObject
 	deviceInventory       *DeviceInfoCache
 	nextInvokeId          uint8
 	clientTransactions    []*ClientSSM
@@ -1478,7 +1479,7 @@ type StateMachineAccessPoint struct {
 	applicationTimeout    uint
 }
 
-func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice *local.LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
 	log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
 
 	s := &StateMachineAccessPoint{
@@ -1795,7 +1796,7 @@ func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
 	return s.deviceInventory
 }
 
-func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
+func (s *StateMachineAccessPoint) GetLocalDevice() *local.LocalDeviceObject {
 	return s.localDevice
 }
 
@@ -1860,7 +1861,7 @@ func (a *ApplicationServiceAccessPoint) Indication(apdu readWriteModel.APDU) err
 			log.Debug().Err(errorFound).Msg("got error")
 
 			// TODO: map it to a error... code temporary placeholder
-			a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
+			return a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
 		}
 	case readWriteModel.APDUUnconfirmedRequestExactly:
 		//assume no errors found
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 41d0107507..b974e4eeda 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -23,6 +23,8 @@ import (
 	"bytes"
 	"encoding/binary"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/service"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
@@ -199,30 +201,223 @@ func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
 	return nil
 }
 
-// TODO: implement
+// TODO: finish
 type Application struct {
-	ApplicationServiceElement
+	*ApplicationServiceElement
 	Collector
+
+	objectName       map[string]*local.LocalDeviceObject
+	objectIdentifier map[string]*local.LocalDeviceObject
+	localDevice      *local.LocalDeviceObject
+	deviceInfoCache  *DeviceInfoCache
+	controllers      map[string]interface{}
+	helpers          map[string]func(apdu readWriteModel.APDU) error
+}
+
+func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
+	log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%v aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
+	a := &Application{}
+	var err error
+	a.ApplicationServiceElement, err = NewApplicationServiceElement(aseID, a)
+	if err != nil {
+		return nil, err
+	}
+
+	// local objects by ID and name
+	a.objectName = map[string]*local.LocalDeviceObject{}
+	a.objectIdentifier = map[string]*local.LocalDeviceObject{}
+
+	// keep track of the local device
+	if localDevice != nil {
+		a.localDevice = localDevice
+
+		// bind the device object to this application
+		localDevice.App = a
+
+		// local objects by ID and name
+		a.objectName[localDevice.ObjectName] = localDevice
+		a.objectName[localDevice.ObjectIdentifier] = localDevice
+	}
+
+	// use the provided cache or make a default one
+	if deviceInfoCache == nil {
+		var newDeviceInfoCache DeviceInfoCache
+		deviceInfoCache = &newDeviceInfoCache
+	}
+	a.deviceInfoCache = deviceInfoCache
+
+	// controllers for managing confirmed requests as a client
+	a.controllers = map[string]interface{}{}
+
+	// now set up the rest of the capabilities
+	a.Collector = Collector{}
+
+	// TODO: no idea how to handle the capabilities
+	return a, nil
+}
+
+func (a *Application) Request(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Request\n%s", apdu)
+
+	// double-check the input is the right kind of APDU
+	switch apdu.(type) {
+	case readWriteModel.APDUUnconfirmedRequestExactly, readWriteModel.APDUConfirmedRequestExactly:
+	default:
+		return errors.New("APDU expected")
+	}
+	return a.ApplicationServiceElement.Request(apdu)
 }
 
-// TODO: implement
-type IOController struct {
+func (a *Application) Indication(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Indication\n%s", apdu)
+
+	// get a helper function
+	helperName := fmt.Sprintf("do_%T", apdu)
+	helperFn := a.helpers[helperName]
+	log.Debug().Msgf("helperFn: %s == %t", helperName, helperFn != nil)
+
+	// send back a reject for unrecognized services
+	if helperFn == nil {
+		if _, ok := apdu.(readWriteModel.APDUConfirmedRequestExactly); ok {
+			return errors.Errorf("no function %s", helperName)
+		}
+		return nil
+	}
+
+	if err := helperFn(apdu); err != nil {
+		log.Debug().Err(err).Msgf("err result")
+		// TODO: do proper mapping
+		a.Response(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0))
+	}
+
+	return nil
 }
 
-// TODO: implement
+// TODO: finish
 type ApplicationIOController struct {
-	IOController
-	Application
+	*IOController
+	*Application
+	queueByAddress map[string]SieveQueue
 }
 
-func NewApplicationIOController(interface{}, interface{}, interface{}, *int) (*ApplicationIOController, error) {
-	return &ApplicationIOController{}, nil
+func NewApplicationIOController(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*ApplicationIOController, error) {
+	a := &ApplicationIOController{
+		// queues for each address
+		queueByAddress: make(map[string]SieveQueue),
+	}
+	var err error
+	a.IOController, err = NewIOController("", a)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating io controller")
+	}
+	a.Application, err = NewApplication(localDevice, localAddress, deviceInfoCache, aseID)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating application")
+	}
+	return a, nil
+}
+
+func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
+	log.Debug().Msgf("ProcessIO %s", iocb)
+
+	// get the destination address from the pdu
+	destinationAddress := iocb.getDestination()
+	log.Debug().Msgf("destinationAddress %s", destinationAddress)
+
+	// look up the queue
+	queue, ok := a.queueByAddress[destinationAddress.String()]
+	if !ok {
+		newQueue, _ := NewSieveQueue(a._AppRequest, destinationAddress)
+		queue = *newQueue
+		a.queueByAddress[destinationAddress.String()] = queue
+	}
+	log.Debug().Msgf("queue %v", queue)
+
+	// ask the queue to process the request
+	return queue.RequestIO(iocb)
+}
+
+func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)
+
+	// look up the queue
+	queue, ok := a.queueByAddress[address.String()]
+	if !ok {
+		log.Debug().Msgf("no queue for %s", address)
+		return nil
+	}
+	log.Debug().Msgf("queue %v", queue)
+
+	// make sure it has an active iocb
+	if queue.activeIOCB == nil {
+		log.Debug().Msgf("no active request for %s", address)
+		return nil
+	}
+
+	// this request is complete
+	switch apdu.(type) {
+	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
+		queue.CompleteIO(queue.activeIOCB, apdu)
+	case readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly, readWriteModel.APDUAbortExactly:
+		// TODO: extract error
+		queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu))
+	default:
+		return errors.New("unrecognized APDU type")
+	}
+	log.Debug().Msg("controller finished")
+	// if the queue is empty and idle, forget about the controller
+	if len(queue.ioQueue.queue) == 0 && queue.activeIOCB == nil {
+		delete(a.queueByAddress, address.String())
+	}
+	return nil
+}
+
+func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
+	log.Debug().Msgf("_AppRequest\n%s", apdu)
+
+	if err := a.Request(apdu); err != nil {
+		log.Error().Err(err).Msg("Uh oh")
+		return
+	}
+
+	// send it downstream, bypass the guard
+	if err := a.Application.Request(apdu); err != nil {
+		log.Error().Err(err).Msg("Uh oh")
+		return
+	}
+
+	// if this was an unconfirmed request, it's complete, no message
+	if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); ok {
+		// TODO: where to get the destination now again??
+		a._AppComplete(nil, apdu)
+	}
+}
+
+func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Request\n%s", apdu)
+
+	// if this is not unconfirmed request, tell the application to use the IOCB interface
+	if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); !ok {
+		return errors.New("use IOCB for confirmed requests")
+	}
+
+	// send it downstream
+	return a.Application.Request(apdu)
+}
+
+func (a *ApplicationIOController) Confirmation(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Confirmation\n%s", apdu)
+
+	// this is an ack, error, reject or abort
+	// TODO: where to get the destination now again??
+	a._AppComplete(nil, apdu)
+	return nil
 }
 
 type BIPSimpleApplication struct {
 	*ApplicationIOController
-	*WhoIsIAmServices
-	*ReadWritePropertyServices
+	*service.WhoIsIAmServices
+	*service.ReadWritePropertyServices
 	localAddress interface{}
 	asap         *ApplicationServiceAccessPoint
 	smap         *StateMachineAccessPoint
@@ -233,7 +428,7 @@ type BIPSimpleApplication struct {
 	mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
 	b := &BIPSimpleApplication{}
 	var err error
 	b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
deleted file mode 100644
index 33c36205e6..0000000000
--- a/plc4go/internal/bacnetip/Device.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-
-type WhoIsIAmServices struct {
-}
-
-func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
-	// TODO: implement me
-	return nil, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// _LocalDeviceObjectDefault is a device entry with default entries
-var _LocalDeviceObjectDefault = LocalDeviceObject{
-	MaximumApduLengthAccepted: &defaultMaxApduLength,
-	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
-	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
-	APDUSegmentTimeout:        5000,
-	APDUTimeout:               3000,
-	NumberOfAPDURetries:       3,
-}
-
-type LocalDeviceObject struct {
-	NumberOfAPDURetries       uint
-	APDUTimeout               uint
-	SegmentationSupported     readWriteModel.BACnetSegmentation
-	APDUSegmentTimeout        uint
-	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
-	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
-}
-
-func NewLocalDeviceObject() *LocalDeviceObject {
-	return &LocalDeviceObject{
-		NumberOfAPDURetries:       _LocalDeviceObjectDefault.NumberOfAPDURetries,
-		APDUTimeout:               _LocalDeviceObjectDefault.APDUTimeout,
-		SegmentationSupported:     _LocalDeviceObjectDefault.SegmentationSupported,
-		APDUSegmentTimeout:        _LocalDeviceObjectDefault.APDUSegmentTimeout,
-		MaxSegmentsAccepted:       _LocalDeviceObjectDefault.MaxSegmentsAccepted,
-		MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
-	}
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 330939d8bb..641a81ad37 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -112,18 +112,18 @@ func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event ap
 
 type ApplicationManager struct {
 	sync.Mutex
-	applications map[string]spi.MessageCodec
+	applications map[string]*ApplicationLayerMessageCodec
 }
 
-func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
 	var localAddress *net.UDPAddr
+	var remoteAddr *net.UDPAddr
 	{
 		host := transportUrl.Host
 		port := transportUrl.Port()
 		if transportUrl.Port() == "" {
 			port = options["defaultUdpPort"][0]
 		}
-		var remoteAddr *net.UDPAddr
 		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
 			panic(err)
 		} else {
@@ -141,7 +141,7 @@ func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Trans
 	defer a.Unlock()
 	messageCodec, ok := a.applications[localAddress.String()]
 	if !ok {
-		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress, remoteAddr)
 		if err != nil {
 			return nil, errors.Wrap(err, "error creating application layer code")
 		}
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
new file mode 100644
index 0000000000..068ed17818
--- /dev/null
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -0,0 +1,589 @@
+package bacnetip
+
+import (
+	"container/heap"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi/plcerrors"
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog/log"
+	"net"
+	"sync"
+	"time"
+)
+
+type IOCBState int
+
+const (
+	IOCBState_IDLE IOCBState = iota
+	IOCBState_PENDING
+	IOCBState_ACTIVE
+	IOCBState_COMPLETED
+	IOCBState_ABORTED
+)
+
+func (i IOCBState) String() string {
+	switch i {
+	case IOCBState_IDLE:
+		return "IDLE"
+	case IOCBState_PENDING:
+		return "PENDING"
+	case IOCBState_ACTIVE:
+		return "ACTIVE"
+	case IOCBState_COMPLETED:
+		return "COMPLETED"
+	case IOCBState_ABORTED:
+		return "ABORTED"
+	default:
+		return "Unknown"
+	}
+}
+
+type IOQControllerStates int
+
+const (
+	IOQControllerStates_CTRL_IDLE IOQControllerStates = iota
+	IOQControllerStates_CTRL_ACTIVE
+	IOQControllerStates_CTRL_WAITING
+)
+
+func (i IOQControllerStates) String() string {
+	switch i {
+	case IOQControllerStates_CTRL_IDLE:
+		return "IDLE"
+	case IOQControllerStates_CTRL_ACTIVE:
+		return "ACTIVE"
+	case IOQControllerStates_CTRL_WAITING:
+		return "WAITING"
+	default:
+		return "Unknown"
+	}
+}
+
+type _IOCB interface {
+	setIOController(ioController _IOController)
+	setIOState(newState IOCBState)
+	getIOState() IOCBState
+	setIOResponse(msg readWriteModel.APDU)
+	Trigger()
+	setIOError(err error)
+	getRequest() readWriteModel.APDU
+	getDestination() net.Addr
+	getPriority() int
+	clearQueue()
+	Abort(err error) error
+}
+
+var _identNext = 1
+var _identLock sync.Mutex
+
+type IOCB struct {
+	ioID           int
+	request        readWriteModel.APDU
+	destination    net.Addr
+	ioState        IOCBState
+	ioResponse     readWriteModel.APDU
+	ioError        error
+	ioController   _IOController
+	ioComplete     sync.Cond
+	ioCompleteDone bool
+	ioCallback     []func()
+	ioQueue        []_IOCB
+	ioTimeout      *time.Timer
+	ioTimoutCancel chan interface{}
+	priority       int
+}
+
+func NewIOCB(request readWriteModel.APDU, destination net.Addr) (*IOCB, error) {
+	// lock the identity sequence number
+	_identLock.Lock()
+
+	// generate a unique identity for this block
+	ioID := _identNext
+	_identNext++
+
+	// release the lock
+	_identLock.Unlock()
+
+	//  debugging postponed until ID acquired
+	log.Debug().Msgf("NewIOCB(%d)", ioID)
+
+	return &IOCB{
+		// save the ID
+		ioID: ioID,
+
+		// save the request parameter
+		request:     request,
+		destination: destination,
+
+		// start with an idle request
+		ioState: IOCBState_IDLE,
+	}, nil
+}
+
+// AddCallback Pass a function to be called when IO is complete.
+func (i *IOCB) AddCallback(fn func()) {
+	log.Debug().Msgf("AddCallback(%d): %t", i.ioID, fn != nil)
+	// store it
+	i.ioCallback = append(i.ioCallback, fn)
+
+	// already complete?
+	if i.ioCompleteDone {
+		i.Trigger()
+	}
+}
+
+// Wait for the completion event to be set
+func (i *IOCB) Wait() {
+	log.Debug().Msgf("Wait(%d)", i.ioID)
+	i.ioComplete.Wait()
+}
+
+// Trigger Set the completion event and make the callback(s)
+func (i *IOCB) Trigger() {
+	log.Debug().Msgf("Trigger(%d)", i.ioID)
+
+	// if it's queued, remove it from its queue
+	myIndex := -1
+	var meAsInterface _IOCB = i
+	for index, qe := range i.ioQueue {
+		if qe == meAsInterface {
+			myIndex = index
+		}
+	}
+	if myIndex >= 0 {
+		log.Debug().Msg("dequeue")
+		i.ioQueue = append(i.ioQueue[:myIndex], i.ioQueue[myIndex+1:]...)
+	}
+
+	// if there's a timer, cancel it
+	if i.ioTimeout != nil {
+		log.Debug().Msg("cancel timeout")
+		i.ioTimeout.Stop()
+	}
+
+	// set the completion event
+	i.ioComplete.Broadcast()
+	log.Debug().Msg("complete event set")
+
+	// make callback(s)
+	for _, f := range i.ioCallback {
+		f()
+	}
+}
+
+// Complete Called to complete a transaction, usually when ProcessIO has shipped the IOCB off to some other thread or
+//        function.
+func (i *IOCB) Complete(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Complete(%d)\n%s", i.ioID, apdu)
+
+	if i.ioController != nil {
+		// pass to the controller
+		return i.ioController.CompleteIO(i, apdu)
+	} else {
+		// just fill in the data
+		i.ioState = IOCBState_COMPLETED
+		i.ioResponse = apdu
+		i.Trigger()
+		return nil
+	}
+}
+
+// Abort Called by a client to abort a transaction.
+func (i *IOCB) Abort(err error) error {
+	log.Debug().Err(err).Msgf("Abort(%d)", i.ioID)
+	defer close(i.ioTimoutCancel)
+
+	if i.ioController != nil {
+		// pass to the controller
+		return i.ioController.AbortIO(i, err)
+	} else {
+		// just fill in the data
+		i.ioState = IOCBState_ABORTED
+		i.ioError = err
+		i.Trigger()
+		return nil
+	}
+}
+
+// SetTimeout Called to set a transaction timer.
+func (i *IOCB) SetTimeout(delay time.Duration) {
+	// if one has already been created, cancel it
+	if i.ioTimeout != nil {
+		i.ioTimeout.Reset(delay)
+	} else {
+		now := time.Now()
+		i.ioTimeout = time.NewTimer(delay)
+		i.ioTimoutCancel = make(chan interface{})
+		go func() {
+			select {
+			case timeout := <-i.ioTimeout.C:
+				_ = i.Abort(plcerrors.NewTimeoutError(now.Sub(timeout)))
+			case <-i.ioTimoutCancel:
+			}
+		}()
+	}
+}
+
+func (i *IOCB) setIOController(ioController _IOController) {
+	i.ioController = ioController
+}
+
+func (i *IOCB) setIOState(newState IOCBState) {
+	i.ioState = newState
+}
+
+func (i *IOCB) getIOState() IOCBState {
+	return i.ioState
+}
+
+func (i *IOCB) setIOResponse(msg readWriteModel.APDU) {
+	i.ioResponse = msg
+}
+
+func (i *IOCB) setIOError(err error) {
+	i.ioError = err
+}
+
+func (i *IOCB) getRequest() readWriteModel.APDU {
+	return i.request
+}
+
+func (i *IOCB) getDestination() net.Addr {
+	return i.destination
+}
+
+func (i *IOCB) getPriority() int {
+	return i.priority
+}
+
+func (i *IOCB) clearQueue() {
+	i.ioQueue = nil
+}
+
+// An PriorityItem is something we manage in a priority queue.
+type PriorityItem struct {
+	value    _IOCB // The value of the item; arbitrary.
+	priority int   // The priority of the item in the queue.
+	// The index is needed by update and is maintained by the heap.Interface methods.
+	index int // The index of the item in the heap.
+}
+
+// A PriorityQueue implements heap.Interface and holds Items.
+type PriorityQueue []*PriorityItem
+
+func (pq PriorityQueue) Len() int { return len(pq) }
+
+func (pq PriorityQueue) Less(i, j int) bool {
+	// We want Pop to give us the highest, not lowest, priority so we use greater than here.
+	return pq[i].priority > pq[j].priority
+}
+
+func (pq PriorityQueue) Swap(i, j int) {
+	pq[i], pq[j] = pq[j], pq[i]
+	pq[i].index = i
+	pq[j].index = j
+}
+
+func (pq *PriorityQueue) Push(x any) {
+	n := len(*pq)
+	item := x.(*PriorityItem)
+	item.index = n
+	*pq = append(*pq, item)
+}
+
+func (pq *PriorityQueue) Pop() any {
+	old := *pq
+	n := len(old)
+	item := old[n-1]
+	old[n-1] = nil  // avoid memory leak
+	item.index = -1 // for safety
+	*pq = old[0 : n-1]
+	return item
+}
+
+// update modifies the priority and value of an Item in the queue.
+func (pq *PriorityQueue) update(item *PriorityItem, value _IOCB, priority int) {
+	item.value = value
+	item.priority = priority
+	heap.Fix(pq, item.index)
+}
+
+type IOQueue struct {
+	notEmpty sync.Cond
+	queue    PriorityQueue
+}
+
+func NewIOQueue(name string) *IOQueue {
+	log.Debug().Msgf("NewIOQueue %s", name)
+	return &IOQueue{}
+}
+
+// Put an IOCB to a queue.  This is usually called by the function that filters requests and passes them out to the
+//        correct processing thread.
+func (i *IOQueue) Put(iocb _IOCB) error {
+	log.Debug().Msgf("Put %s", iocb)
+
+	// requests should be pending before being queued
+	if iocb.getIOState() != IOCBState_PENDING {
+		return errors.New("invalid state transition")
+	}
+
+	// add the request to the end of the list of iocb's at same priority
+	priority := iocb.getPriority()
+
+	heap.Push(&i.queue, PriorityItem{iocb, priority, 0})
+
+	i.notEmpty.Broadcast()
+	return nil
+}
+
+// Get a request from a queue, optionally block until a request is available.
+func (i *IOQueue) Get(block bool, delay *time.Duration) (_IOCB, error) {
+	log.Debug().Msgf("Get block=%t, delay=%s", block, delay)
+
+	// if the queue is empty, and we do not block return None
+	if !block && len(i.queue) == 0 {
+		log.Debug().Msgf("not blocking and empty")
+		return nil, nil
+	}
+
+	// wait for something to be in the queue
+	if len(i.queue) == 0 {
+		if delay != nil {
+			gotSomething := make(chan interface{})
+			go func() {
+				i.notEmpty.Wait()
+				close(gotSomething)
+			}()
+			timeout := time.NewTimer(*delay)
+			defer utils.CleanupTimer(timeout)
+			select {
+			case <-gotSomething:
+			case <-timeout.C:
+				return nil, nil
+			}
+		} else {
+			i.notEmpty.Wait()
+		}
+	}
+
+	if len(i.queue) == 0 {
+		return nil, nil
+	}
+
+	// extract the first element
+	pi := heap.Pop(&i.queue).(PriorityItem)
+	iocb := pi.value
+	iocb.clearQueue()
+
+	// return the request
+	return iocb, nil
+}
+
+// Remove a control block from the queue, called if the request
+//        is canceled/aborted
+func (i *IOQueue) Remove(iocb _IOCB) error {
+	for _, item := range i.queue {
+		if iocb == item.value {
+			heap.Remove(&i.queue, item.index)
+
+			if len(i.queue) == 0 {
+				i.notEmpty.Broadcast()
+			}
+			return nil
+		}
+	}
+	return nil
+}
+
+//Abort all the control blocks in the queue
+func (i *IOQueue) Abort(err error) {
+	for _, item := range i.queue {
+		item.value.clearQueue()
+		_ = item.value.Abort(err)
+	}
+
+	//
+	i.queue = nil
+
+	// the queue is now empty, clear the event
+	i.notEmpty.Broadcast()
+}
+
+type _IOController interface {
+	Abort(err error) error
+	ProcessIO(iocb _IOCB) error
+	CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error
+	AbortIO(iocb _IOCB, err error) error
+}
+
+type IOController struct {
+	name       string
+	rootStruct _IOController
+}
+
+func NewIOController(name string, rootStruct _IOController) (*IOController, error) {
+	log.Debug().Msgf("NewIOController name=%s", name)
+	return &IOController{
+		// save the name
+		name:       name,
+		rootStruct: rootStruct,
+	}, nil
+}
+
+// Abort all requests, no default implementation.
+func (i *IOController) Abort(err error) error {
+	return nil
+}
+
+// RequestIO Called by a client to start processing a request.
+func (i *IOController) RequestIO(iocb _IOCB) error {
+	log.Debug().Msgf("RequestIO\n%s", iocb)
+
+	// bind the iocb to this controller
+	iocb.setIOController(i)
+
+	// hopefully there won't be an error
+	var err error
+
+	// change the state
+	iocb.setIOState(IOCBState_PENDING)
+
+	// let derived class figure out how to process this
+	err = i.rootStruct.ProcessIO(iocb)
+
+	// if there was an error, abort the request
+	if err != nil {
+		return i.rootStruct.AbortIO(iocb, err)
+	}
+	return nil
+}
+
+// ProcessIO Figure out how to respond to this request.  This must be provided by the derived class.
+func (i *IOController) ProcessIO(iocb _IOCB) error {
+	return errors.New("IOController must implement process_io()")
+}
+
+// ActiveIO Called by a handler to notify the controller that a request is being processed
+func (i *IOController) ActiveIO(iocb _IOCB) error {
+	log.Debug().Msgf("ActiveIO %s", iocb)
+
+	// requests should be idle or pending before coming active
+	if iocb.getIOState() != IOCBState_IDLE && iocb.getIOState() != IOCBState_PENDING {
+		return errors.Errorf("invalid state transition (currently %d)", iocb.getIOState())
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_ACTIVE)
+	return nil
+}
+
+// CompleteIO Called by a handler to return data to the client
+func (i *IOController) CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("CompleteIO %s\n%s", iocb, apdu)
+
+	// if it completed, leave it alone
+	if iocb.getIOState() == IOCBState_COMPLETED {
+		return nil
+	}
+
+	// if it already aborted, leave it alone
+	if iocb.getIOState() == IOCBState_ABORTED {
+		return nil
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_COMPLETED)
+	iocb.setIOResponse(apdu)
+
+	// notify the client
+	iocb.Trigger()
+
+	return nil
+}
+
+// AbortIO Called by a handler or a client to abort a transaction
+func (i *IOController) AbortIO(iocb _IOCB, err error) error {
+	log.Debug().Err(err).Msgf("AbortIO %s", iocb)
+
+	// if it completed, leave it alone
+	if iocb.getIOState() == IOCBState_COMPLETED {
+		return nil
+	}
+
+	// if it already aborted, leave it alone
+	if iocb.getIOState() == IOCBState_ABORTED {
+		return nil
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_ABORTED)
+	iocb.setIOError(err)
+
+	// notify the client
+	iocb.Trigger()
+
+	return nil
+}
+
+type IOQController struct {
+	*IOController
+	state      IOQControllerStates
+	activeIOCB _IOCB
+	ioQueue    *IOQueue
+}
+
+func NewIOQController(name string) (*IOQController, error) {
+	i := &IOQController{}
+	var err error
+	i.IOController, err = NewIOController(name, i)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating IO controller")
+	}
+
+	// start idle
+	i.state = IOQControllerStates_CTRL_IDLE
+	log.Debug().Msgf("%s %s %s", time.Now(), name, i.state)
+
+	// no active iocb
+	i.activeIOCB = nil
+
+	// create an IOQueue for iocb's requested when not idle
+	i.ioQueue = NewIOQueue(name + " queue")
+
+	return i, nil
+}
+
+// TODO: implement functions of IOQController
+
+type SieveQueue struct {
+	*IOQController
+	requestFn func(apdu readWriteModel.APDU)
+	address   net.Addr
+}
+
+func NewSieveQueue(fn func(apdu readWriteModel.APDU), address net.Addr) (*SieveQueue, error) {
+	s := &SieveQueue{}
+	var err error
+	s.IOQController, err = NewIOQController(address.String())
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating a IOQController")
+	}
+
+	// Save a reference to the request function
+	s.requestFn = fn
+	s.address = address
+	return s, nil
+}
+
+func (s *SieveQueue) ProcessIO(iocb _IOCB) error {
+	log.Debug().Msgf("ProcessIO %s", iocb)
+
+	// this is now an active request
+	s.ActiveIO(iocb)
+
+	// send the request
+	s.requestFn(iocb.getRequest())
+	return nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 670cc00ddc..45b51af713 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -21,6 +21,7 @@ package bacnetip
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
 	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
@@ -38,17 +39,23 @@ type ApplicationLayerMessageCodec struct {
 	bipSimpleApplication *BIPSimpleApplication
 	messageCode          *MessageCodec
 	deviceInfoCache      DeviceInfoCache
+
+	localAddress  *net.UDPAddr
+	remoteAddress *net.UDPAddr
 }
 
-func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
 	// Have the transport create a new transport-instance.
 	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating transport instance")
 	}
 	_ = transportInstance
-	a := &ApplicationLayerMessageCodec{}
-	application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+	a := &ApplicationLayerMessageCodec{
+		localAddress:  localAddress,
+		remoteAddress: remoteAddress,
+	}
+	application, err := NewBIPSimpleApplication(&local.LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -81,7 +88,22 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
 }
 
 func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
-	panic("not yet mapped")
+	iocb, err := NewIOCB(message.(model.APDU), m.remoteAddress)
+	if err != nil {
+		return errors.Wrap(err, "error creating IOCB")
+	}
+	go func() {
+		go m.bipSimpleApplication.RequestIO(iocb)
+		iocb.Wait()
+		if iocb.ioError != nil {
+			// TODO: handle error
+		} else if iocb.ioResponse != nil {
+			// TODO: response?
+		} else {
+			// TODO: what now?
+		}
+	}()
+	return nil
 }
 
 func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
diff --git a/plc4go/internal/bacnetip/local/Device.go b/plc4go/internal/bacnetip/local/Device.go
new file mode 100644
index 0000000000..752013a2ff
--- /dev/null
+++ b/plc4go/internal/bacnetip/local/Device.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package local
+
+import (
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+)
+
+type LocalDeviceObject struct {
+	NumberOfAPDURetries       uint
+	APDUTimeout               uint
+	SegmentationSupported     readWriteModel.BACnetSegmentation
+	APDUSegmentTimeout        uint
+	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+	App                       interface{}
+	ObjectName                string
+	ObjectIdentifier          string
+}
diff --git a/plc4go/internal/bacnetip/service/Device.go b/plc4go/internal/bacnetip/service/Device.go
new file mode 100644
index 0000000000..ae8e18ff9b
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Device.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type WhoIsIAmServices struct {
+}
+
+func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
+	// TODO: implement me
+	return nil, nil
+}
diff --git a/plc4go/internal/bacnetip/service/Object.go b/plc4go/internal/bacnetip/service/Object.go
new file mode 100644
index 0000000000..946f5d5fa5
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Object.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type ReadWritePropertyServices struct {
+}
+
+func NewReadWritePropertyServices() (*ReadWritePropertyServices, error) {
+	// TODO: implement me
+	return nil, nil
+}