You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/15 15:28:14 UTC

[plc4x] 02/02: refactor(plc4go/bacnet): added more application code for protocol

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 25c5e941636f66d19ae70a0704a67362a24cd51f
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Nov 15 16:28:00 2022 +0100

    refactor(plc4go/bacnet): added more application code for protocol
---
 plc4go/internal/bacnetip/ApplicationLayer.go  | 137 +++---
 plc4go/internal/bacnetip/ApplicationModule.go | 219 +++++++++-
 plc4go/internal/bacnetip/Device.go            |  63 ---
 plc4go/internal/bacnetip/Driver.go            |   8 +-
 plc4go/internal/bacnetip/IOCBModule.go        | 589 ++++++++++++++++++++++++++
 plc4go/internal/bacnetip/MessageCodec.go      |  30 +-
 plc4go/internal/bacnetip/local/Device.go      |  36 ++
 plc4go/internal/bacnetip/service/Device.go    |  28 ++
 plc4go/internal/bacnetip/service/Object.go    |  28 ++
 9 files changed, 987 insertions(+), 151 deletions(-)

diff --git a/plc4go/internal/bacnetip/ApplicationLayer.go b/plc4go/internal/bacnetip/ApplicationLayer.go
index 04dc50f415..d7a5ce94f3 100644
--- a/plc4go/internal/bacnetip/ApplicationLayer.go
+++ b/plc4go/internal/bacnetip/ApplicationLayer.go
@@ -21,6 +21,7 @@ package bacnetip
 
 import (
 	"bytes"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
@@ -31,33 +32,33 @@ import (
 type SSMState uint8
 
 const (
-	IDLE SSMState = iota
-	SEGMENTED_REQUEST
-	AWAIT_CONFIRMATION
-	AWAIT_RESPONSE
-	SEGMENTED_RESPONSE
-	SEGMENTED_CONFIRMATION
-	COMPLETED
-	ABORTED
+	SSMState_IDLE SSMState = iota
+	SSMState_SEGMENTED_REQUEST
+	SSMState_AWAIT_CONFIRMATION
+	SSMState_AWAIT_RESPONSE
+	SSMState_SEGMENTED_RESPONSE
+	SSMState_SEGMENTED_CONFIRMATION
+	SSMState_COMPLETED
+	SSMState_ABORTED
 )
 
 func (s SSMState) String() string {
 	switch s {
-	case IDLE:
+	case SSMState_IDLE:
 		return "IDLE"
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return "SEGMENTED_REQUEST"
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return "AWAIT_CONFIRMATION"
-	case AWAIT_RESPONSE:
+	case SSMState_AWAIT_RESPONSE:
 		return "AWAIT_RESPONSE"
-	case SEGMENTED_RESPONSE:
+	case SSMState_SEGMENTED_RESPONSE:
 		return "SEGMENTED_RESPONSE"
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return "SEGMENTED_CONFIRMATION"
-	case COMPLETED:
+	case SSMState_COMPLETED:
 		return "COMPLETED"
-	case ABORTED:
+	case SSMState_ABORTED:
 		return "ABORTED"
 	default:
 		return "Unknown"
@@ -76,7 +77,7 @@ type SSMSAPRequirements interface {
 	_ServiceAccessPoint
 	_Client
 	GetDeviceInfoCache() *DeviceInfoCache
-	GetLocalDevice() LocalDeviceObject
+	GetLocalDevice() *local.LocalDeviceObject
 	GetProposedWindowSize() uint8
 	GetClientTransactions() []*ClientSSM
 	GetServerTransactions() []*ServerSSM
@@ -126,7 +127,7 @@ func NewSSM(sap SSMSAPRequirements, pduAddress []byte) (SSM, error) {
 		ssmSAP:                sap,
 		pduAddress:            pduAddress,
 		deviceInfo:            deviceInfo,
-		state:                 IDLE,
+		state:                 SSMState_IDLE,
 		numberOfApduRetries:   localDevice.NumberOfAPDURetries,
 		apduTimeout:           localDevice.APDUTimeout,
 		segmentationSupported: localDevice.SegmentationSupported,
@@ -163,7 +164,7 @@ func (s *SSM) restartTimer(millis uint) {
 // setState This function is called when the derived class wants to change state
 func (s *SSM) setState(newState SSMState, timer *uint) error {
 	log.Debug().Msgf("setState %s timer=%d", newState, timer)
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		return errors.Errorf("Invalid state transition from %s to %s", s.state, newState)
 	}
 
@@ -360,7 +361,7 @@ func (s *ClientSSM) setState(newState SSMState, timer *uint) error {
 		return errors.Wrap(err, "error during SSM state transition")
 	}
 
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetClientTransactions() // TODO remove "this" transaction from the list
 		if s.deviceInfo == nil {
@@ -458,7 +459,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 		// unsegmented
 		s.sentAllSegments = true
 		s.retryCount = 0
-		if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+		if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 	} else {
@@ -468,7 +469,7 @@ func (s *ClientSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 		s.segmentRetryCount = 0
 		s.initialSequenceNumber = 0
 		s.actualWindowSize = nil
-		if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+		if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 	}
@@ -497,11 +498,11 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
 	log.Debug().Msgf("confirmation\n%s", apdu)
 
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequest(apdu)
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitConfirmation(apdu)
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedConfirmation(apdu)
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -512,13 +513,13 @@ func (s *ClientSSM) Confirmation(apdu readWriteModel.APDU) error {
 func (s *ClientSSM) processTask() error {
 	log.Debug().Msg("processTask")
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequestTimeout()
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitConfirmationTimeout()
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedConfirmationTimeout()
-	case COMPLETED, ABORTED:
+	case SSMState_COMPLETED, SSMState_ABORTED:
 		return nil
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -530,7 +531,7 @@ func (s *ClientSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
 	log.Debug().Msgf("abort\n%s", reason)
 
 	// change the state to aborted
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return nil, errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -558,7 +559,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 		} else if s.sentAllSegments {
 			log.Debug().Msg("all done sending request")
 
-			if err := s.setState(AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
+			if err := s.setState(SSMState_AWAIT_CONFIRMATION, &s.apduTimeout); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		} else {
@@ -587,7 +588,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 				log.Debug().Err(err).Msg("error sending response")
 			}
 		} else {
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		}
@@ -607,7 +608,7 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 			}
 		} else if !apdu.GetSegmentedMessage() {
 			// ack is not segmented
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 			if err := s.Response(apdu); err != nil {
@@ -624,13 +625,13 @@ func (s *ClientSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 			s.actualWindowSize = &minWindowSize
 			s.lastSequenceNumber = 0
 			s.initialSequenceNumber = 0
-			if err := s.setState(SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
+			if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, &s.segmentTimeout); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 		}
 	case readWriteModel.APDUErrorExactly:
 		log.Debug().Msg("error/reject/abort")
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -685,7 +686,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUAbortExactly:
 		log.Debug().Msg("Server aborted")
 
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -694,7 +695,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly:
 		log.Debug().Msg("simple ack, error or reject")
 
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		if err := s.Response(apdu); err != nil {
@@ -706,7 +707,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 		if !apdu.GetSegmentedMessage() {
 			log.Debug().Msg("unsegmented")
 
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 			if err := s.Response(apdu); err != nil {
@@ -733,7 +734,7 @@ func (s *ClientSSM) awaitConfirmation(apdu readWriteModel.APDU) error {
 			s.actualWindowSize = apdu.GetProposedWindowSize()
 			s.lastSequenceNumber = 0
 			s.initialSequenceNumber = 0
-			if err := s.setState(SEGMENTED_CONFIRMATION, nil); err != nil {
+			if err := s.setState(SSMState_SEGMENTED_CONFIRMATION, nil); err != nil {
 				return errors.Wrap(err, "error switching state")
 			}
 
@@ -858,7 +859,7 @@ func (s *ClientSSM) segmentedConfirmation(apdu readWriteModel.APDU) error {
 			log.Debug().Err(err).Msg("error sending request")
 		}
 
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "error switching state")
 		}
 		// TODO: this is nonsense... We need to parse the service and the apdu not sure where to get it from now...
@@ -928,7 +929,7 @@ func (s *ServerSSM) setState(newState SSMState, timer *uint) error {
 		return errors.Wrap(err, "error during SSM state transition")
 	}
 
-	if s.state == COMPLETED || s.state == ABORTED {
+	if s.state == SSMState_COMPLETED || s.state == SSMState_ABORTED {
 		log.Debug().Msg("remove from active transaction")
 		s.ssmSAP.GetServerTransactions() // TODO remove "this" transaction from the list
 		if s.deviceInfo != nil {
@@ -954,13 +955,13 @@ func (s *ServerSSM) Indication(apdu readWriteModel.APDU) error { // TODO: maybe
 	// make sure we're getting confirmed requests
 
 	switch s.state {
-	case IDLE:
+	case SSMState_IDLE:
 		return s.idle(apdu)
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequest(apdu)
-	case AWAIT_RESPONSE:
+	case SSMState_AWAIT_RESPONSE:
 		return s.awaitResponse(apdu)
-	case SEGMENTED_RESPONSE:
+	case SSMState_SEGMENTED_RESPONSE:
 		return s.segmentedResponse(apdu)
 	default:
 		return errors.Errorf("invalid state %s", s.state)
@@ -984,7 +985,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 	log.Debug().Msgf("confirmation\n%s", apdu)
 
 	// check to see we are in the correct state
-	if s.state != AWAIT_RESPONSE {
+	if s.state != SSMState_AWAIT_RESPONSE {
 		log.Debug().Msg("warning: no expecting a response")
 	}
 
@@ -993,7 +994,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 	case readWriteModel.APDUAbortExactly:
 		log.Debug().Msg("abort")
 
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 
@@ -1004,7 +1005,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 		log.Debug().Msg("simple ack, error or reject")
 
 		// transaction completed
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 
@@ -1086,7 +1087,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 				if err := s.Response(apdu); err != nil {
 					log.Debug().Err(err).Msg("error sending response")
 				}
-				if err := s.setState(COMPLETED, nil); err != nil {
+				if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 					return errors.Wrap(err, "Error setting state to aborted")
 				}
 			} else {
@@ -1097,7 +1098,7 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 				if err := s.Response(segment); err != nil {
 					log.Debug().Err(err).Msg("error sending response")
 				}
-				if err := s.setState(SEGMENTED_RESPONSE, nil); err != nil {
+				if err := s.setState(SSMState_SEGMENTED_RESPONSE, nil); err != nil {
 					return errors.Wrap(err, "Error setting state to aborted")
 				}
 			}
@@ -1114,13 +1115,13 @@ func (s *ServerSSM) Confirmation(apdu readWriteModel.APDU) error {
 func (s *ServerSSM) processTask() error {
 	log.Debug().Msg("processTask")
 	switch s.state {
-	case SEGMENTED_REQUEST:
+	case SSMState_SEGMENTED_REQUEST:
 		return s.segmentedRequestTimeout()
-	case AWAIT_CONFIRMATION:
+	case SSMState_AWAIT_CONFIRMATION:
 		return s.awaitResponseTimeout()
-	case SEGMENTED_CONFIRMATION:
+	case SSMState_SEGMENTED_CONFIRMATION:
 		return s.segmentedResponseTimeout()
-	case COMPLETED, ABORTED:
+	case SSMState_COMPLETED, SSMState_ABORTED:
 		return nil
 	default:
 		return errors.Errorf("Invalid state %s", s.state)
@@ -1132,7 +1133,7 @@ func (s *ServerSSM) abort(reason readWriteModel.BACnetAbortReason) (readWriteMod
 	log.Debug().Msgf("abort\n%s", reason)
 
 	// change the state to aborted
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return nil, errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -1200,7 +1201,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 
 	// unsegmented request
 	if len(apduConfirmedRequest.GetSegment()) <= 0 {
-		if err := s.setState(AWAIT_RESPONSE, nil); err != nil {
+		if err := s.setState(SSMState_AWAIT_RESPONSE, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		return s.Request(apdu)
@@ -1228,7 +1229,7 @@ func (s *ServerSSM) idle(apdu readWriteModel.APDU) error {
 	// initialize the state
 	s.lastSequenceNumber = 0
 	s.initialSequenceNumber = 0
-	if err := s.setState(SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
+	if err := s.setState(SSMState_SEGMENTED_REQUEST, &s.segmentTimeout); err != nil {
 		return errors.Wrap(err, "Error setting state to aborted")
 	}
 
@@ -1243,7 +1244,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 
 	// some kind of problem
 	if _, ok := apdu.(readWriteModel.APDUAbortExactly); ok {
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		return s.Response(apdu)
@@ -1312,7 +1313,7 @@ func (s *ServerSSM) segmentedRequest(apdu readWriteModel.APDU) error {
 
 		// forward the whole thing to the application
 		applicationTimeout := s.ssmSAP.GetApplicationTimeout()
-		if err := s.setState(AWAIT_RESPONSE, &applicationTimeout); err != nil {
+		if err := s.setState(SSMState_AWAIT_RESPONSE, &applicationTimeout); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		// TODO: here we need to rebuild again yada yada
@@ -1348,7 +1349,7 @@ func (s *ServerSSM) segmentedRequestTimeout() error {
 	log.Debug().Msg("segmentedRequestTimeout")
 
 	// give up
-	if err := s.setState(ABORTED, nil); err != nil {
+	if err := s.setState(SSMState_ABORTED, nil); err != nil {
 		return errors.Wrap(err, "Error setting state to aborted")
 	}
 	return nil
@@ -1364,7 +1365,7 @@ func (s *ServerSSM) awaitResponse(apdu readWriteModel.APDU) error {
 		log.Debug().Msg("client aborting this request")
 
 		// forward to the application
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		if err := s.Request(apdu); err != nil { // send it ot the device
@@ -1410,7 +1411,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
 		} else if s.sentAllSegments {
 			// final ack received?
 			log.Debug().Msg("all done sending response")
-			if err := s.setState(COMPLETED, nil); err != nil {
+			if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 				return errors.Wrap(err, "Error setting state to aborted")
 			}
 		} else {
@@ -1427,7 +1428,7 @@ func (s *ServerSSM) segmentedResponse(apdu readWriteModel.APDU) error {
 		}
 	// some kind of problem
 	case readWriteModel.APDUAbortExactly:
-		if err := s.setState(COMPLETED, nil); err != nil {
+		if err := s.setState(SSMState_COMPLETED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 		if err := s.Response(apdu); err != nil { // send it ot the application
@@ -1451,7 +1452,7 @@ func (s *ServerSSM) segmentedResponseTimeout() error {
 		}
 	} else {
 		// five up
-		if err := s.setState(ABORTED, nil); err != nil {
+		if err := s.setState(SSMState_ABORTED, nil); err != nil {
 			return errors.Wrap(err, "Error setting state to aborted")
 		}
 	}
@@ -1462,7 +1463,7 @@ type StateMachineAccessPoint struct {
 	*Client
 	*ServiceAccessPoint
 
-	localDevice           LocalDeviceObject
+	localDevice           *local.LocalDeviceObject
 	deviceInventory       *DeviceInfoCache
 	nextInvokeId          uint8
 	clientTransactions    []*ClientSSM
@@ -1478,7 +1479,7 @@ type StateMachineAccessPoint struct {
 	applicationTimeout    uint
 }
 
-func NewStateMachineAccessPoint(localDevice LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
+func NewStateMachineAccessPoint(localDevice *local.LocalDeviceObject, deviceInventory *DeviceInfoCache, sapID *int, cid *int) (*StateMachineAccessPoint, error) {
 	log.Debug().Msgf("NewStateMachineAccessPoint localDevice=%v deviceInventory=%v sap=%v cid=%v", localDevice, deviceInventory, sapID, cid)
 
 	s := &StateMachineAccessPoint{
@@ -1795,7 +1796,7 @@ func (s *StateMachineAccessPoint) GetDeviceInfoCache() *DeviceInfoCache {
 	return s.deviceInventory
 }
 
-func (s *StateMachineAccessPoint) GetLocalDevice() LocalDeviceObject {
+func (s *StateMachineAccessPoint) GetLocalDevice() *local.LocalDeviceObject {
 	return s.localDevice
 }
 
@@ -1860,7 +1861,7 @@ func (a *ApplicationServiceAccessPoint) Indication(apdu readWriteModel.APDU) err
 			log.Debug().Err(errorFound).Msg("got error")
 
 			// TODO: map it to a error... code temporary placeholder
-			a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
+			return a.Response(readWriteModel.NewAPDUReject(apdu.GetInvokeId(), nil, 0))
 		}
 	case readWriteModel.APDUUnconfirmedRequestExactly:
 		//assume no errors found
diff --git a/plc4go/internal/bacnetip/ApplicationModule.go b/plc4go/internal/bacnetip/ApplicationModule.go
index 41d0107507..b974e4eeda 100644
--- a/plc4go/internal/bacnetip/ApplicationModule.go
+++ b/plc4go/internal/bacnetip/ApplicationModule.go
@@ -23,6 +23,8 @@ import (
 	"bytes"
 	"encoding/binary"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/service"
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
@@ -199,30 +201,223 @@ func (i *DeviceInfoCache) Release(deviceInfo DeviceInfo) error {
 	return nil
 }
 
-// TODO: implement
+// TODO: finish
 type Application struct {
-	ApplicationServiceElement
+	*ApplicationServiceElement
 	Collector
+
+	objectName       map[string]*local.LocalDeviceObject
+	objectIdentifier map[string]*local.LocalDeviceObject
+	localDevice      *local.LocalDeviceObject
+	deviceInfoCache  *DeviceInfoCache
+	controllers      map[string]interface{}
+	helpers          map[string]func(apdu readWriteModel.APDU) error
+}
+
+func NewApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*Application, error) {
+	log.Debug().Msgf("NewApplication %v %s deviceInfoCache=%v aseID=%d", localDevice, localAddress, deviceInfoCache, aseID)
+	a := &Application{}
+	var err error
+	a.ApplicationServiceElement, err = NewApplicationServiceElement(aseID, a)
+	if err != nil {
+		return nil, err
+	}
+
+	// local objects by ID and name
+	a.objectName = map[string]*local.LocalDeviceObject{}
+	a.objectIdentifier = map[string]*local.LocalDeviceObject{}
+
+	// keep track of the local device
+	if localDevice != nil {
+		a.localDevice = localDevice
+
+		// bind the device object to this application
+		localDevice.App = a
+
+		// local objects by ID and name
+		a.objectName[localDevice.ObjectName] = localDevice
+		a.objectName[localDevice.ObjectIdentifier] = localDevice
+	}
+
+	// use the provided cache or make a default one
+	if deviceInfoCache == nil {
+		var newDeviceInfoCache DeviceInfoCache
+		deviceInfoCache = &newDeviceInfoCache
+	}
+	a.deviceInfoCache = deviceInfoCache
+
+	// controllers for managing confirmed requests as a client
+	a.controllers = map[string]interface{}{}
+
+	// now set up the rest of the capabilities
+	a.Collector = Collector{}
+
+	// TODO: no idea how to handle the capabilities
+	return a, nil
+}
+
+func (a *Application) Request(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Request\n%s", apdu)
+
+	// double-check the input is the right kind of APDU
+	switch apdu.(type) {
+	case readWriteModel.APDUUnconfirmedRequestExactly, readWriteModel.APDUConfirmedRequestExactly:
+	default:
+		return errors.New("APDU expected")
+	}
+	return a.ApplicationServiceElement.Request(apdu)
 }
 
-// TODO: implement
-type IOController struct {
+func (a *Application) Indication(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Indication\n%s", apdu)
+
+	// get a helper function
+	helperName := fmt.Sprintf("do_%T", apdu)
+	helperFn := a.helpers[helperName]
+	log.Debug().Msgf("helperFn: %s == %t", helperName, helperFn != nil)
+
+	// send back a reject for unrecognized services
+	if helperFn == nil {
+		if _, ok := apdu.(readWriteModel.APDUConfirmedRequestExactly); ok {
+			return errors.Errorf("no function %s", helperName)
+		}
+		return nil
+	}
+
+	if err := helperFn(apdu); err != nil {
+		log.Debug().Err(err).Msgf("err result")
+		// TODO: do proper mapping
+		a.Response(readWriteModel.NewAPDUError(0, readWriteModel.BACnetConfirmedServiceChoice_CREATE_OBJECT, nil, 0))
+	}
+
+	return nil
 }
 
-// TODO: implement
+// TODO: finish
 type ApplicationIOController struct {
-	IOController
-	Application
+	*IOController
+	*Application
+	queueByAddress map[string]SieveQueue
 }
 
-func NewApplicationIOController(interface{}, interface{}, interface{}, *int) (*ApplicationIOController, error) {
-	return &ApplicationIOController{}, nil
+func NewApplicationIOController(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*ApplicationIOController, error) {
+	a := &ApplicationIOController{
+		// queues for each address
+		queueByAddress: make(map[string]SieveQueue),
+	}
+	var err error
+	a.IOController, err = NewIOController("", a)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating io controller")
+	}
+	a.Application, err = NewApplication(localDevice, localAddress, deviceInfoCache, aseID)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating application")
+	}
+	return a, nil
+}
+
+func (a *ApplicationIOController) ProcessIO(iocb _IOCB) error {
+	log.Debug().Msgf("ProcessIO %s", iocb)
+
+	// get the destination address from the pdu
+	destinationAddress := iocb.getDestination()
+	log.Debug().Msgf("destinationAddress %s", destinationAddress)
+
+	// look up the queue
+	queue, ok := a.queueByAddress[destinationAddress.String()]
+	if !ok {
+		newQueue, _ := NewSieveQueue(a._AppRequest, destinationAddress)
+		queue = *newQueue
+		a.queueByAddress[destinationAddress.String()] = queue
+	}
+	log.Debug().Msgf("queue %v", queue)
+
+	// ask the queue to process the request
+	return queue.RequestIO(iocb)
+}
+
+func (a *ApplicationIOController) _AppComplete(address net.Addr, apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("_AppComplete %s\n%s", address, apdu)
+
+	// look up the queue
+	queue, ok := a.queueByAddress[address.String()]
+	if !ok {
+		log.Debug().Msgf("no queue for %s", address)
+		return nil
+	}
+	log.Debug().Msgf("queue %v", queue)
+
+	// make sure it has an active iocb
+	if queue.activeIOCB == nil {
+		log.Debug().Msgf("no active request for %s", address)
+		return nil
+	}
+
+	// this request is complete
+	switch apdu.(type) {
+	case readWriteModel.APDUSimpleAckExactly, readWriteModel.APDUComplexAckExactly:
+		queue.CompleteIO(queue.activeIOCB, apdu)
+	case readWriteModel.APDUErrorExactly, readWriteModel.APDURejectExactly, readWriteModel.APDUAbortExactly:
+		// TODO: extract error
+		queue.AbortIO(queue.activeIOCB, errors.Errorf("%s", apdu))
+	default:
+		return errors.New("unrecognized APDU type")
+	}
+	log.Debug().Msg("controller finished")
+	// if the queue is empty and idle, forget about the controller
+	if len(queue.ioQueue.queue) == 0 && queue.activeIOCB == nil {
+		delete(a.queueByAddress, address.String())
+	}
+	return nil
+}
+
+func (a *ApplicationIOController) _AppRequest(apdu readWriteModel.APDU) {
+	log.Debug().Msgf("_AppRequest\n%s", apdu)
+
+	if err := a.Request(apdu); err != nil {
+		log.Error().Err(err).Msg("Uh oh")
+		return
+	}
+
+	// send it downstream, bypass the guard
+	if err := a.Application.Request(apdu); err != nil {
+		log.Error().Err(err).Msg("Uh oh")
+		return
+	}
+
+	// if this was an unconfirmed request, it's complete, no message
+	if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); ok {
+		// TODO: where to get the destination now again??
+		a._AppComplete(nil, apdu)
+	}
+}
+
+func (a *ApplicationIOController) Request(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Request\n%s", apdu)
+
+	// if this is not unconfirmed request, tell the application to use the IOCB interface
+	if _, ok := apdu.(readWriteModel.APDUUnconfirmedRequestExactly); !ok {
+		return errors.New("use IOCB for confirmed requests")
+	}
+
+	// send it downstream
+	return a.Application.Request(apdu)
+}
+
+func (a *ApplicationIOController) Confirmation(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Confirmation\n%s", apdu)
+
+	// this is an ack, error, reject or abort
+	// TODO: where to get the destination now again??
+	a._AppComplete(nil, apdu)
+	return nil
 }
 
 type BIPSimpleApplication struct {
 	*ApplicationIOController
-	*WhoIsIAmServices
-	*ReadWritePropertyServices
+	*service.WhoIsIAmServices
+	*service.ReadWritePropertyServices
 	localAddress interface{}
 	asap         *ApplicationServiceAccessPoint
 	smap         *StateMachineAccessPoint
@@ -233,7 +428,7 @@ type BIPSimpleApplication struct {
 	mux          *UDPMultiplexer
 }
 
-func NewBIPSimpleApplication(localDevice LocalDeviceObject, localAddress *net.UDPAddr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
+func NewBIPSimpleApplication(localDevice *local.LocalDeviceObject, localAddress net.Addr, deviceInfoCache *DeviceInfoCache, aseID *int) (*BIPSimpleApplication, error) {
 	b := &BIPSimpleApplication{}
 	var err error
 	b.ApplicationIOController, err = NewApplicationIOController(localDevice, localAddress, deviceInfoCache, aseID)
diff --git a/plc4go/internal/bacnetip/Device.go b/plc4go/internal/bacnetip/Device.go
deleted file mode 100644
index 33c36205e6..0000000000
--- a/plc4go/internal/bacnetip/Device.go
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package bacnetip
-
-import readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
-
-type WhoIsIAmServices struct {
-}
-
-func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
-	// TODO: implement me
-	return nil, nil
-}
-
-var defaultMaxApduLength = readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024
-var defaultMaxSegmentsAccepted = readWriteModel.MaxSegmentsAccepted_NUM_SEGMENTS_16
-
-// _LocalDeviceObjectDefault is a device entry with default entries
-var _LocalDeviceObjectDefault = LocalDeviceObject{
-	MaximumApduLengthAccepted: &defaultMaxApduLength,
-	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
-	MaxSegmentsAccepted:       &defaultMaxSegmentsAccepted,
-	APDUSegmentTimeout:        5000,
-	APDUTimeout:               3000,
-	NumberOfAPDURetries:       3,
-}
-
-type LocalDeviceObject struct {
-	NumberOfAPDURetries       uint
-	APDUTimeout               uint
-	SegmentationSupported     readWriteModel.BACnetSegmentation
-	APDUSegmentTimeout        uint
-	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
-	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
-}
-
-func NewLocalDeviceObject() *LocalDeviceObject {
-	return &LocalDeviceObject{
-		NumberOfAPDURetries:       _LocalDeviceObjectDefault.NumberOfAPDURetries,
-		APDUTimeout:               _LocalDeviceObjectDefault.APDUTimeout,
-		SegmentationSupported:     _LocalDeviceObjectDefault.SegmentationSupported,
-		APDUSegmentTimeout:        _LocalDeviceObjectDefault.APDUSegmentTimeout,
-		MaxSegmentsAccepted:       _LocalDeviceObjectDefault.MaxSegmentsAccepted,
-		MaximumApduLengthAccepted: _LocalDeviceObjectDefault.MaximumApduLengthAccepted,
-	}
-}
diff --git a/plc4go/internal/bacnetip/Driver.go b/plc4go/internal/bacnetip/Driver.go
index 330939d8bb..641a81ad37 100644
--- a/plc4go/internal/bacnetip/Driver.go
+++ b/plc4go/internal/bacnetip/Driver.go
@@ -112,18 +112,18 @@ func (m *Driver) DiscoverWithContext(ctx context.Context, callback func(event ap
 
 type ApplicationManager struct {
 	sync.Mutex
-	applications map[string]spi.MessageCodec
+	applications map[string]*ApplicationLayerMessageCodec
 }
 
-func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (spi.MessageCodec, error) {
+func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Transport, transportUrl url.URL, options map[string][]string) (*ApplicationLayerMessageCodec, error) {
 	var localAddress *net.UDPAddr
+	var remoteAddr *net.UDPAddr
 	{
 		host := transportUrl.Host
 		port := transportUrl.Port()
 		if transportUrl.Port() == "" {
 			port = options["defaultUdpPort"][0]
 		}
-		var remoteAddr *net.UDPAddr
 		if resolvedRemoteAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%s", host, port)); err != nil {
 			panic(err)
 		} else {
@@ -141,7 +141,7 @@ func (a *ApplicationManager) getApplicationLayerMessageCode(transport *udp.Trans
 	defer a.Unlock()
 	messageCodec, ok := a.applications[localAddress.String()]
 	if !ok {
-		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress)
+		newMessageCodec, err := NewApplicationLayerMessageCodec(transport, transportUrl, options, localAddress, remoteAddr)
 		if err != nil {
 			return nil, errors.Wrap(err, "error creating application layer code")
 		}
diff --git a/plc4go/internal/bacnetip/IOCBModule.go b/plc4go/internal/bacnetip/IOCBModule.go
new file mode 100644
index 0000000000..068ed17818
--- /dev/null
+++ b/plc4go/internal/bacnetip/IOCBModule.go
@@ -0,0 +1,589 @@
+package bacnetip
+
+import (
+	"container/heap"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi/plcerrors"
+	"github.com/apache/plc4x/plc4go/spi/utils"
+	"github.com/pkg/errors"
+	"github.com/rs/zerolog/log"
+	"net"
+	"sync"
+	"time"
+)
+
+type IOCBState int
+
+const (
+	IOCBState_IDLE IOCBState = iota
+	IOCBState_PENDING
+	IOCBState_ACTIVE
+	IOCBState_COMPLETED
+	IOCBState_ABORTED
+)
+
+func (i IOCBState) String() string {
+	switch i {
+	case IOCBState_IDLE:
+		return "IDLE"
+	case IOCBState_PENDING:
+		return "PENDING"
+	case IOCBState_ACTIVE:
+		return "ACTIVE"
+	case IOCBState_COMPLETED:
+		return "COMPLETED"
+	case IOCBState_ABORTED:
+		return "ABORTED"
+	default:
+		return "Unknown"
+	}
+}
+
+type IOQControllerStates int
+
+const (
+	IOQControllerStates_CTRL_IDLE IOQControllerStates = iota
+	IOQControllerStates_CTRL_ACTIVE
+	IOQControllerStates_CTRL_WAITING
+)
+
+func (i IOQControllerStates) String() string {
+	switch i {
+	case IOQControllerStates_CTRL_IDLE:
+		return "IDLE"
+	case IOQControllerStates_CTRL_ACTIVE:
+		return "ACTIVE"
+	case IOQControllerStates_CTRL_WAITING:
+		return "WAITING"
+	default:
+		return "Unknown"
+	}
+}
+
+type _IOCB interface {
+	setIOController(ioController _IOController)
+	setIOState(newState IOCBState)
+	getIOState() IOCBState
+	setIOResponse(msg readWriteModel.APDU)
+	Trigger()
+	setIOError(err error)
+	getRequest() readWriteModel.APDU
+	getDestination() net.Addr
+	getPriority() int
+	clearQueue()
+	Abort(err error) error
+}
+
+var _identNext = 1
+var _identLock sync.Mutex
+
+type IOCB struct {
+	ioID           int
+	request        readWriteModel.APDU
+	destination    net.Addr
+	ioState        IOCBState
+	ioResponse     readWriteModel.APDU
+	ioError        error
+	ioController   _IOController
+	ioComplete     sync.Cond
+	ioCompleteDone bool
+	ioCallback     []func()
+	ioQueue        []_IOCB
+	ioTimeout      *time.Timer
+	ioTimoutCancel chan interface{}
+	priority       int
+}
+
+func NewIOCB(request readWriteModel.APDU, destination net.Addr) (*IOCB, error) {
+	// lock the identity sequence number
+	_identLock.Lock()
+
+	// generate a unique identity for this block
+	ioID := _identNext
+	_identNext++
+
+	// release the lock
+	_identLock.Unlock()
+
+	//  debugging postponed until ID acquired
+	log.Debug().Msgf("NewIOCB(%d)", ioID)
+
+	return &IOCB{
+		// save the ID
+		ioID: ioID,
+
+		// save the request parameter
+		request:     request,
+		destination: destination,
+
+		// start with an idle request
+		ioState: IOCBState_IDLE,
+	}, nil
+}
+
+// AddCallback Pass a function to be called when IO is complete.
+func (i *IOCB) AddCallback(fn func()) {
+	log.Debug().Msgf("AddCallback(%d): %t", i.ioID, fn != nil)
+	// store it
+	i.ioCallback = append(i.ioCallback, fn)
+
+	// already complete?
+	if i.ioCompleteDone {
+		i.Trigger()
+	}
+}
+
+// Wait for the completion event to be set
+func (i *IOCB) Wait() {
+	log.Debug().Msgf("Wait(%d)", i.ioID)
+	i.ioComplete.Wait()
+}
+
+// Trigger Set the completion event and make the callback(s)
+func (i *IOCB) Trigger() {
+	log.Debug().Msgf("Trigger(%d)", i.ioID)
+
+	// if it's queued, remove it from its queue
+	myIndex := -1
+	var meAsInterface _IOCB = i
+	for index, qe := range i.ioQueue {
+		if qe == meAsInterface {
+			myIndex = index
+		}
+	}
+	if myIndex >= 0 {
+		log.Debug().Msg("dequeue")
+		i.ioQueue = append(i.ioQueue[:myIndex], i.ioQueue[myIndex+1:]...)
+	}
+
+	// if there's a timer, cancel it
+	if i.ioTimeout != nil {
+		log.Debug().Msg("cancel timeout")
+		i.ioTimeout.Stop()
+	}
+
+	// set the completion event
+	i.ioComplete.Broadcast()
+	log.Debug().Msg("complete event set")
+
+	// make callback(s)
+	for _, f := range i.ioCallback {
+		f()
+	}
+}
+
+// Complete Called to complete a transaction, usually when ProcessIO has shipped the IOCB off to some other thread or
+//        function.
+func (i *IOCB) Complete(apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("Complete(%d)\n%s", i.ioID, apdu)
+
+	if i.ioController != nil {
+		// pass to the controller
+		return i.ioController.CompleteIO(i, apdu)
+	} else {
+		// just fill in the data
+		i.ioState = IOCBState_COMPLETED
+		i.ioResponse = apdu
+		i.Trigger()
+		return nil
+	}
+}
+
+// Abort Called by a client to abort a transaction.
+func (i *IOCB) Abort(err error) error {
+	log.Debug().Err(err).Msgf("Abort(%d)", i.ioID)
+	defer close(i.ioTimoutCancel)
+
+	if i.ioController != nil {
+		// pass to the controller
+		return i.ioController.AbortIO(i, err)
+	} else {
+		// just fill in the data
+		i.ioState = IOCBState_ABORTED
+		i.ioError = err
+		i.Trigger()
+		return nil
+	}
+}
+
+// SetTimeout Called to set a transaction timer.
+func (i *IOCB) SetTimeout(delay time.Duration) {
+	// if one has already been created, cancel it
+	if i.ioTimeout != nil {
+		i.ioTimeout.Reset(delay)
+	} else {
+		now := time.Now()
+		i.ioTimeout = time.NewTimer(delay)
+		i.ioTimoutCancel = make(chan interface{})
+		go func() {
+			select {
+			case timeout := <-i.ioTimeout.C:
+				_ = i.Abort(plcerrors.NewTimeoutError(now.Sub(timeout)))
+			case <-i.ioTimoutCancel:
+			}
+		}()
+	}
+}
+
+func (i *IOCB) setIOController(ioController _IOController) {
+	i.ioController = ioController
+}
+
+func (i *IOCB) setIOState(newState IOCBState) {
+	i.ioState = newState
+}
+
+func (i *IOCB) getIOState() IOCBState {
+	return i.ioState
+}
+
+func (i *IOCB) setIOResponse(msg readWriteModel.APDU) {
+	i.ioResponse = msg
+}
+
+func (i *IOCB) setIOError(err error) {
+	i.ioError = err
+}
+
+func (i *IOCB) getRequest() readWriteModel.APDU {
+	return i.request
+}
+
+func (i *IOCB) getDestination() net.Addr {
+	return i.destination
+}
+
+func (i *IOCB) getPriority() int {
+	return i.priority
+}
+
+func (i *IOCB) clearQueue() {
+	i.ioQueue = nil
+}
+
+// An PriorityItem is something we manage in a priority queue.
+type PriorityItem struct {
+	value    _IOCB // The value of the item; arbitrary.
+	priority int   // The priority of the item in the queue.
+	// The index is needed by update and is maintained by the heap.Interface methods.
+	index int // The index of the item in the heap.
+}
+
+// A PriorityQueue implements heap.Interface and holds Items.
+type PriorityQueue []*PriorityItem
+
+func (pq PriorityQueue) Len() int { return len(pq) }
+
+func (pq PriorityQueue) Less(i, j int) bool {
+	// We want Pop to give us the highest, not lowest, priority so we use greater than here.
+	return pq[i].priority > pq[j].priority
+}
+
+func (pq PriorityQueue) Swap(i, j int) {
+	pq[i], pq[j] = pq[j], pq[i]
+	pq[i].index = i
+	pq[j].index = j
+}
+
+func (pq *PriorityQueue) Push(x any) {
+	n := len(*pq)
+	item := x.(*PriorityItem)
+	item.index = n
+	*pq = append(*pq, item)
+}
+
+func (pq *PriorityQueue) Pop() any {
+	old := *pq
+	n := len(old)
+	item := old[n-1]
+	old[n-1] = nil  // avoid memory leak
+	item.index = -1 // for safety
+	*pq = old[0 : n-1]
+	return item
+}
+
+// update modifies the priority and value of an Item in the queue.
+func (pq *PriorityQueue) update(item *PriorityItem, value _IOCB, priority int) {
+	item.value = value
+	item.priority = priority
+	heap.Fix(pq, item.index)
+}
+
+type IOQueue struct {
+	notEmpty sync.Cond
+	queue    PriorityQueue
+}
+
+func NewIOQueue(name string) *IOQueue {
+	log.Debug().Msgf("NewIOQueue %s", name)
+	return &IOQueue{}
+}
+
+// Put an IOCB to a queue.  This is usually called by the function that filters requests and passes them out to the
+//        correct processing thread.
+func (i *IOQueue) Put(iocb _IOCB) error {
+	log.Debug().Msgf("Put %s", iocb)
+
+	// requests should be pending before being queued
+	if iocb.getIOState() != IOCBState_PENDING {
+		return errors.New("invalid state transition")
+	}
+
+	// add the request to the end of the list of iocb's at same priority
+	priority := iocb.getPriority()
+
+	heap.Push(&i.queue, PriorityItem{iocb, priority, 0})
+
+	i.notEmpty.Broadcast()
+	return nil
+}
+
+// Get a request from a queue, optionally block until a request is available.
+func (i *IOQueue) Get(block bool, delay *time.Duration) (_IOCB, error) {
+	log.Debug().Msgf("Get block=%t, delay=%s", block, delay)
+
+	// if the queue is empty, and we do not block return None
+	if !block && len(i.queue) == 0 {
+		log.Debug().Msgf("not blocking and empty")
+		return nil, nil
+	}
+
+	// wait for something to be in the queue
+	if len(i.queue) == 0 {
+		if delay != nil {
+			gotSomething := make(chan interface{})
+			go func() {
+				i.notEmpty.Wait()
+				close(gotSomething)
+			}()
+			timeout := time.NewTimer(*delay)
+			defer utils.CleanupTimer(timeout)
+			select {
+			case <-gotSomething:
+			case <-timeout.C:
+				return nil, nil
+			}
+		} else {
+			i.notEmpty.Wait()
+		}
+	}
+
+	if len(i.queue) == 0 {
+		return nil, nil
+	}
+
+	// extract the first element
+	pi := heap.Pop(&i.queue).(PriorityItem)
+	iocb := pi.value
+	iocb.clearQueue()
+
+	// return the request
+	return iocb, nil
+}
+
+// Remove a control block from the queue, called if the request
+//        is canceled/aborted
+func (i *IOQueue) Remove(iocb _IOCB) error {
+	for _, item := range i.queue {
+		if iocb == item.value {
+			heap.Remove(&i.queue, item.index)
+
+			if len(i.queue) == 0 {
+				i.notEmpty.Broadcast()
+			}
+			return nil
+		}
+	}
+	return nil
+}
+
+//Abort all the control blocks in the queue
+func (i *IOQueue) Abort(err error) {
+	for _, item := range i.queue {
+		item.value.clearQueue()
+		_ = item.value.Abort(err)
+	}
+
+	//
+	i.queue = nil
+
+	// the queue is now empty, clear the event
+	i.notEmpty.Broadcast()
+}
+
+type _IOController interface {
+	Abort(err error) error
+	ProcessIO(iocb _IOCB) error
+	CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error
+	AbortIO(iocb _IOCB, err error) error
+}
+
+type IOController struct {
+	name       string
+	rootStruct _IOController
+}
+
+func NewIOController(name string, rootStruct _IOController) (*IOController, error) {
+	log.Debug().Msgf("NewIOController name=%s", name)
+	return &IOController{
+		// save the name
+		name:       name,
+		rootStruct: rootStruct,
+	}, nil
+}
+
+// Abort all requests, no default implementation.
+func (i *IOController) Abort(err error) error {
+	return nil
+}
+
+// RequestIO Called by a client to start processing a request.
+func (i *IOController) RequestIO(iocb _IOCB) error {
+	log.Debug().Msgf("RequestIO\n%s", iocb)
+
+	// bind the iocb to this controller
+	iocb.setIOController(i)
+
+	// hopefully there won't be an error
+	var err error
+
+	// change the state
+	iocb.setIOState(IOCBState_PENDING)
+
+	// let derived class figure out how to process this
+	err = i.rootStruct.ProcessIO(iocb)
+
+	// if there was an error, abort the request
+	if err != nil {
+		return i.rootStruct.AbortIO(iocb, err)
+	}
+	return nil
+}
+
+// ProcessIO Figure out how to respond to this request.  This must be provided by the derived class.
+func (i *IOController) ProcessIO(iocb _IOCB) error {
+	return errors.New("IOController must implement process_io()")
+}
+
+// ActiveIO Called by a handler to notify the controller that a request is being processed
+func (i *IOController) ActiveIO(iocb _IOCB) error {
+	log.Debug().Msgf("ActiveIO %s", iocb)
+
+	// requests should be idle or pending before coming active
+	if iocb.getIOState() != IOCBState_IDLE && iocb.getIOState() != IOCBState_PENDING {
+		return errors.Errorf("invalid state transition (currently %d)", iocb.getIOState())
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_ACTIVE)
+	return nil
+}
+
+// CompleteIO Called by a handler to return data to the client
+func (i *IOController) CompleteIO(iocb _IOCB, apdu readWriteModel.APDU) error {
+	log.Debug().Msgf("CompleteIO %s\n%s", iocb, apdu)
+
+	// if it completed, leave it alone
+	if iocb.getIOState() == IOCBState_COMPLETED {
+		return nil
+	}
+
+	// if it already aborted, leave it alone
+	if iocb.getIOState() == IOCBState_ABORTED {
+		return nil
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_COMPLETED)
+	iocb.setIOResponse(apdu)
+
+	// notify the client
+	iocb.Trigger()
+
+	return nil
+}
+
+// AbortIO Called by a handler or a client to abort a transaction
+func (i *IOController) AbortIO(iocb _IOCB, err error) error {
+	log.Debug().Err(err).Msgf("AbortIO %s", iocb)
+
+	// if it completed, leave it alone
+	if iocb.getIOState() == IOCBState_COMPLETED {
+		return nil
+	}
+
+	// if it already aborted, leave it alone
+	if iocb.getIOState() == IOCBState_ABORTED {
+		return nil
+	}
+
+	// change the state
+	iocb.setIOState(IOCBState_ABORTED)
+	iocb.setIOError(err)
+
+	// notify the client
+	iocb.Trigger()
+
+	return nil
+}
+
+type IOQController struct {
+	*IOController
+	state      IOQControllerStates
+	activeIOCB _IOCB
+	ioQueue    *IOQueue
+}
+
+func NewIOQController(name string) (*IOQController, error) {
+	i := &IOQController{}
+	var err error
+	i.IOController, err = NewIOController(name, i)
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating IO controller")
+	}
+
+	// start idle
+	i.state = IOQControllerStates_CTRL_IDLE
+	log.Debug().Msgf("%s %s %s", time.Now(), name, i.state)
+
+	// no active iocb
+	i.activeIOCB = nil
+
+	// create an IOQueue for iocb's requested when not idle
+	i.ioQueue = NewIOQueue(name + " queue")
+
+	return i, nil
+}
+
+// TODO: implement functions of IOQController
+
+type SieveQueue struct {
+	*IOQController
+	requestFn func(apdu readWriteModel.APDU)
+	address   net.Addr
+}
+
+func NewSieveQueue(fn func(apdu readWriteModel.APDU), address net.Addr) (*SieveQueue, error) {
+	s := &SieveQueue{}
+	var err error
+	s.IOQController, err = NewIOQController(address.String())
+	if err != nil {
+		return nil, errors.Wrap(err, "error creating a IOQController")
+	}
+
+	// Save a reference to the request function
+	s.requestFn = fn
+	s.address = address
+	return s, nil
+}
+
+func (s *SieveQueue) ProcessIO(iocb _IOCB) error {
+	log.Debug().Msgf("ProcessIO %s", iocb)
+
+	// this is now an active request
+	s.ActiveIO(iocb)
+
+	// send the request
+	s.requestFn(iocb.getRequest())
+	return nil
+}
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 670cc00ddc..45b51af713 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -21,6 +21,7 @@ package bacnetip
 
 import (
 	"context"
+	"github.com/apache/plc4x/plc4go/internal/bacnetip/local"
 	"github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/apache/plc4x/plc4go/spi"
 	"github.com/apache/plc4x/plc4go/spi/default"
@@ -38,17 +39,23 @@ type ApplicationLayerMessageCodec struct {
 	bipSimpleApplication *BIPSimpleApplication
 	messageCode          *MessageCodec
 	deviceInfoCache      DeviceInfoCache
+
+	localAddress  *net.UDPAddr
+	remoteAddress *net.UDPAddr
 }
 
-func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
+func NewApplicationLayerMessageCodec(udpTransport *udp.Transport, transportUrl url.URL, options map[string][]string, localAddress *net.UDPAddr, remoteAddress *net.UDPAddr) (*ApplicationLayerMessageCodec, error) {
 	// Have the transport create a new transport-instance.
 	transportInstance, err := udpTransport.CreateTransportInstanceForLocalAddress(transportUrl, options, localAddress)
 	if err != nil {
 		return nil, errors.Wrap(err, "error creating transport instance")
 	}
 	_ = transportInstance
-	a := &ApplicationLayerMessageCodec{}
-	application, err := NewBIPSimpleApplication(LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
+	a := &ApplicationLayerMessageCodec{
+		localAddress:  localAddress,
+		remoteAddress: remoteAddress,
+	}
+	application, err := NewBIPSimpleApplication(&local.LocalDeviceObject{}, localAddress, &a.deviceInfoCache, nil)
 	if err != nil {
 		return nil, err
 	}
@@ -81,7 +88,22 @@ func (m *ApplicationLayerMessageCodec) IsRunning() bool {
 }
 
 func (m *ApplicationLayerMessageCodec) Send(message spi.Message) error {
-	panic("not yet mapped")
+	iocb, err := NewIOCB(message.(model.APDU), m.remoteAddress)
+	if err != nil {
+		return errors.Wrap(err, "error creating IOCB")
+	}
+	go func() {
+		go m.bipSimpleApplication.RequestIO(iocb)
+		iocb.Wait()
+		if iocb.ioError != nil {
+			// TODO: handle error
+		} else if iocb.ioResponse != nil {
+			// TODO: response?
+		} else {
+			// TODO: what now?
+		}
+	}()
+	return nil
 }
 
 func (m *ApplicationLayerMessageCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
diff --git a/plc4go/internal/bacnetip/local/Device.go b/plc4go/internal/bacnetip/local/Device.go
new file mode 100644
index 0000000000..752013a2ff
--- /dev/null
+++ b/plc4go/internal/bacnetip/local/Device.go
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package local
+
+import (
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+)
+
+type LocalDeviceObject struct {
+	NumberOfAPDURetries       uint
+	APDUTimeout               uint
+	SegmentationSupported     readWriteModel.BACnetSegmentation
+	APDUSegmentTimeout        uint
+	MaxSegmentsAccepted       *readWriteModel.MaxSegmentsAccepted
+	MaximumApduLengthAccepted *readWriteModel.MaxApduLengthAccepted
+	App                       interface{}
+	ObjectName                string
+	ObjectIdentifier          string
+}
diff --git a/plc4go/internal/bacnetip/service/Device.go b/plc4go/internal/bacnetip/service/Device.go
new file mode 100644
index 0000000000..ae8e18ff9b
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Device.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type WhoIsIAmServices struct {
+}
+
+func NewWhoIsIAmServices() (*WhoIsIAmServices, error) {
+	// TODO: implement me
+	return nil, nil
+}
diff --git a/plc4go/internal/bacnetip/service/Object.go b/plc4go/internal/bacnetip/service/Object.go
new file mode 100644
index 0000000000..946f5d5fa5
--- /dev/null
+++ b/plc4go/internal/bacnetip/service/Object.go
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package service
+
+type ReadWritePropertyServices struct {
+}
+
+func NewReadWritePropertyServices() (*ReadWritePropertyServices, error) {
+	// TODO: implement me
+	return nil, nil
+}