You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/11/09 15:40:29 UTC

[plc4x] 02/02: feat(plc4go/bacnet): partial transaction state machine

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 863eb4e22a6e960c630ce144232f80786b773696
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Nov 9 16:40:12 2022 +0100

    feat(plc4go/bacnet): partial transaction state machine
    
    + ported code from bacpypes
---
 plc4go/internal/bacnetip/DeviceInventory.go        |  26 +-
 plc4go/internal/bacnetip/MessageCodec.go           |   5 +-
 .../internal/bacnetip/TransactionStateMachine.go   | 375 +++++++++++++++++-
 .../bacnetip/TransactionStateMachine_test.go       | 429 +++++++++++++++++++++
 4 files changed, 803 insertions(+), 32 deletions(-)

diff --git a/plc4go/internal/bacnetip/DeviceInventory.go b/plc4go/internal/bacnetip/DeviceInventory.go
index a067017775..c2beee7b7c 100644
--- a/plc4go/internal/bacnetip/DeviceInventory.go
+++ b/plc4go/internal/bacnetip/DeviceInventory.go
@@ -22,7 +22,6 @@ package bacnetip
 import (
 	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
 	"github.com/pkg/errors"
-	"math"
 	"sync"
 	"time"
 )
@@ -44,18 +43,25 @@ func (d *DeviceInventory) getEntryForDestination(destination []uint8) (DeviceEnt
 }
 
 var NoDeviceEntry = DeviceEntry{
-	MaximumApduLengthAcceptedLength: readWriteModel.NewBACnetTagPayloadUnsignedInteger(nil, nil, nil, nil, nil, nil, nil, func() *uint64 {
-		var maxUint64 uint64 = math.MaxUint64
-		return &maxUint64
-	}(), 4),
+	DeviceIdentifier:          nil,
+	MaximumApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1024,
+	SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+	MaxSegmentsAccepted:       16,
+	APDUSegmentTimeout:        5000,
+	APDUTimeout:               3000,
+	NumberOfAPDURetries:       3,
 }
 
 type DeviceEntry struct {
-	DeviceIdentifier                readWriteModel.BACnetTagPayloadObjectIdentifier
-	MaximumApduLengthAcceptedLength readWriteModel.BACnetTagPayloadUnsignedInteger
-	SegmentationSupported           bool
-	VendorId                        readWriteModel.BACnetVendorId
-	DeviceObjects                   []DeviceObject
+	DeviceIdentifier          readWriteModel.BACnetTagPayloadObjectIdentifier
+	MaximumApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+	SegmentationSupported     readWriteModel.BACnetSegmentation
+	MaxSegmentsAccepted       readWriteModel.MaxSegmentsAccepted
+	APDUSegmentTimeout        uint
+	APDUTimeout               uint
+	NumberOfAPDURetries       uint
+	VendorId                  readWriteModel.BACnetVendorId
+	DeviceObjects             []DeviceObject
 }
 
 func (d DeviceEntry) GetDeviceObjects(filter ...DeviceObjectFilter) []DeviceObject {
diff --git a/plc4go/internal/bacnetip/MessageCodec.go b/plc4go/internal/bacnetip/MessageCodec.go
index 8675796d87..528884a7e8 100644
--- a/plc4go/internal/bacnetip/MessageCodec.go
+++ b/plc4go/internal/bacnetip/MessageCodec.go
@@ -35,10 +35,7 @@ type ApplicationLayerMessageCodec struct {
 
 func NewApplicationLayerMessageCodec(transportInstance transports.TransportInstance, deviceInventory *DeviceInventory) *ApplicationLayerMessageCodec {
 	return &ApplicationLayerMessageCodec{
-		TransactionStateMachine{
-			MessageCodec:    NewMessageCodec(transportInstance),
-			deviceInventory: deviceInventory,
-		},
+		NewTransactionStateMachine(NewMessageCodec(transportInstance), deviceInventory),
 	}
 }
 
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine.go b/plc4go/internal/bacnetip/TransactionStateMachine.go
index 21b0c3147b..68cc443ef2 100644
--- a/plc4go/internal/bacnetip/TransactionStateMachine.go
+++ b/plc4go/internal/bacnetip/TransactionStateMachine.go
@@ -44,6 +44,23 @@ type TransactionStateMachine struct {
 	RequestTimer          int
 }
 
+func NewTransactionStateMachine(messageCodec *MessageCodec, deviceInventory *DeviceInventory) TransactionStateMachine {
+	return TransactionStateMachine{
+		MessageCodec:          messageCodec,
+		deviceInventory:       deviceInventory,
+		retryCount:            3,
+		segmentRetryCount:     3,
+		duplicateCount:        0,
+		sentAllSegments:       false,
+		lastSequenceNumber:    0,
+		initialSequenceNumber: 0,
+		actualWindowSize:      0,
+		proposeWindowSize:     2,
+		segmentTimer:          1500,
+		RequestTimer:          3000,
+	}
+}
+
 func (t *TransactionStateMachine) GetCodec() spi.MessageCodec {
 	return t
 }
@@ -89,7 +106,7 @@ func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (ha
 			return false, nil
 		}
 		if npdu.GetControl().GetMessageTypeFieldPresent() {
-			log.Trace().Msg("No message type field present")
+			log.Trace().Msg("Message type field present")
 			return false, nil
 		}
 		var entryForDestination = NoDeviceEntry
@@ -106,27 +123,12 @@ func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (ha
 		apdu := npdu.GetApdu()
 		switch apdu := apdu.(type) {
 		case readWriteModel.APDUConfirmedRequestExactly:
+			// TODO: this is a "client" request
 			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
-			apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
-			if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
-				return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
-			}
-			// TODO: handle potential retry
-			if apduLengthDoesOverflow {
-				// TODO: handle potential segmentation
-			}
 			return false, nil
 		case readWriteModel.APDUComplexAckExactly:
+			// TODO: this is a "server" response
 			// TODO: check if adpu length is the magic number (it should be "unencoded")
-			maximumApduLengthForDevice := entryForDestination.MaximumApduLengthAcceptedLength.GetActualValue()
-			apduLengthDoesOverflow := uint64(apdu.GetLengthInBytes()) > maximumApduLengthForDevice
-			if apduLengthDoesOverflow && !entryForDestination.SegmentationSupported {
-				return false, errors.Errorf("We have a overflow. We need %d bytes, but device only supports a max of %d and no segmentation", apdu.GetLengthInBytes(), maximumApduLengthForDevice)
-			}
-			if apduLengthDoesOverflow {
-				// TODO: handle potential segmentation
-			}
 			return false, nil
 		default:
 			log.Trace().Msgf("APDU type not relevant %T present", apdu)
@@ -137,3 +139,340 @@ func (t *TransactionStateMachine) handleOutboundMessage(message spi.Message) (ha
 		return false, nil
 	}
 }
+
+// TODO: this is a placeholder for a tasking framework
+type _Task struct {
+	taskTime    time.Time
+	isScheduled bool
+}
+
+func (t *_Task) installTask(when *time.Time, delta *time.Duration) {
+	// TODO: schedule task
+}
+
+func (t *_Task) suspendTask() {
+	// TODO: suspend task
+}
+
+func (t *_Task) resume() {
+	// TODO: resume task
+}
+
+type OneShotTask struct {
+	_Task
+}
+
+// TODO: this is the interface to the outside for the SSM
+type ServiceAccessPoint interface {
+	GetDeviceInventory() *DeviceInventory
+	GetLocalDevice() DeviceEntry
+	GetProposedWindowSize() uint8
+	Request(apdu readWriteModel.APDU)
+	// TODO: wrap that properly
+	GetClientTransactions() []interface{}
+}
+
+type SSMState uint8
+
+const (
+	IDLE SSMState = iota
+	SEGMENTED_REQUEST
+	AWAIT_CONFIRMATION
+	AWAIT_RESPONSE
+	SEGMENTED_RESPONSE
+	SEGMENTED_CONFIRMATION
+	COMPLETED
+	ABORTED
+)
+
+func (s SSMState) String() string {
+	switch s {
+	case IDLE:
+		return "IDLE"
+	case SEGMENTED_REQUEST:
+		return "SEGMENTED_REQUEST"
+	case AWAIT_CONFIRMATION:
+		return "AWAIT_CONFIRMATION"
+	case AWAIT_RESPONSE:
+		return "AWAIT_RESPONSE"
+	case SEGMENTED_RESPONSE:
+		return "SEGMENTED_RESPONSE"
+	case SEGMENTED_CONFIRMATION:
+		return "SEGMENTED_CONFIRMATION"
+	case COMPLETED:
+		return "COMPLETED"
+	case ABORTED:
+		return "ABORTED"
+	default:
+		return "Unknown"
+	}
+}
+
+type segmentAPDU struct {
+	originalApdu     readWriteModel.APDU
+	originalInvokeId uint8
+	serviceBytes     []byte
+	serviceChoice    readWriteModel.BACnetConfirmedServiceChoice
+	isAck            bool
+}
+
+// SSM - Segmentation State Machine
+type SSM struct {
+	OneShotTask
+
+	ssmSAP ServiceAccessPoint
+
+	pduAddress  []byte
+	deviceEntry DeviceEntry
+
+	invokeId uint8
+
+	state        SSMState
+	segmentAPDU  *segmentAPDU // TODO: rename that to segmentAPDUSource or something
+	segmentSize  uint
+	segmentCount uint8
+
+	retryCount            uint
+	segmentRetryCount     uint
+	sentAllSegments       bool
+	lastSequenceNumber    uint8
+	initialSequenceNumber uint8
+	actualWindowSize      uint8
+
+	numberOfApduRetries   uint
+	apduTimeout           uint
+	segmentationSupported readWriteModel.BACnetSegmentation
+	segmentTimeout        uint
+	maxSegmentsAccepted   readWriteModel.MaxSegmentsAccepted
+	maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+}
+
+func NewSSM(sap ServiceAccessPoint, pduAddress []byte) (SSM, error) {
+	deviceEntry, err := sap.GetDeviceInventory().getEntryForDestination(pduAddress)
+	if err != nil {
+		return SSM{}, errors.Wrap(err, "Can't create SSM")
+	}
+	localDevice := sap.GetLocalDevice()
+	return SSM{
+		ssmSAP:                sap,
+		pduAddress:            pduAddress,
+		deviceEntry:           deviceEntry,
+		state:                 IDLE,
+		numberOfApduRetries:   localDevice.NumberOfAPDURetries,
+		apduTimeout:           localDevice.APDUTimeout,
+		segmentationSupported: localDevice.SegmentationSupported,
+		segmentTimeout:        localDevice.APDUSegmentTimeout,
+		maxSegmentsAccepted:   localDevice.MaxSegmentsAccepted,
+		maxApduLengthAccepted: localDevice.MaximumApduLengthAccepted,
+	}, nil
+}
+
+func (s *SSM) startTimer(millis int64) {
+	s.restartTimer(millis)
+}
+
+func (s *SSM) stopTimer() {
+	if s.isScheduled {
+		s.suspendTask()
+	}
+}
+
+func (s *SSM) restartTimer(millis int64) {
+	if s.isScheduled {
+		s.suspendTask()
+	}
+
+	delta := time.Millisecond * time.Duration(millis)
+	s.installTask(nil, &delta)
+}
+
+func (s *SSM) setState(newState SSMState, timer *int64) error {
+	if s.state == COMPLETED || s.state == ABORTED {
+		return errors.Errorf("Invalid state transition from %s to %s", s.state, newState)
+	}
+
+	s.stopTimer()
+
+	s.state = newState
+
+	if timer != nil {
+		s.startTimer(*timer)
+	}
+	return nil
+}
+
+func (s *SSM) setSegmentationContext(apdu readWriteModel.APDU) error {
+	switch apdu := apdu.(type) {
+	case readWriteModel.APDUConfirmedRequestExactly:
+		if apdu.GetSegmentedMessage() || apdu.GetMoreFollows() {
+			return errors.New("Can't handle already segmented message")
+		}
+		bytes, err := apdu.GetServiceRequest().Serialize()
+		if err != nil {
+			return errors.Wrap(err, "Can serialize service request")
+		}
+		segmentAPDU := segmentAPDU{
+			originalApdu:     apdu,
+			originalInvokeId: apdu.GetInvokeId(),
+			serviceBytes:     bytes,
+			serviceChoice:    apdu.GetServiceRequest().GetServiceChoice(),
+		}
+		s.segmentAPDU = &segmentAPDU
+	case readWriteModel.APDUComplexAckExactly:
+		if apdu.GetSegmentedMessage() || apdu.GetMoreFollows() {
+			return errors.New("Can't handle already segmented message")
+		}
+		bytes, err := apdu.GetServiceAck().Serialize()
+		if err != nil {
+			return errors.Wrap(err, "Can serialize service request")
+		}
+		segmentAPDU := segmentAPDU{
+			originalApdu:  apdu,
+			serviceBytes:  bytes,
+			serviceChoice: apdu.GetServiceAck().GetServiceChoice(),
+			isAck:         true,
+		}
+		s.segmentAPDU = &segmentAPDU
+	default:
+		return errors.Errorf("invalid APDU type %T", apdu)
+	}
+	return nil
+}
+
+func (s *SSM) getSegment(index uint8) (segmentAPDU readWriteModel.APDU, moreFollows bool, err error) {
+	if s.segmentAPDU == nil {
+		return nil, false, errors.New("No segment apdu set")
+	}
+
+	if index > s.segmentCount {
+		return nil, false, errors.Errorf("Invalid segment number %d, APDU has %d segments", index, s.segmentCount)
+	}
+
+	// TODO: the original code does here something funky but it seems it is best to just return the original apdu
+	if s.segmentCount == 1 {
+		return s.segmentAPDU.originalApdu, false, nil
+	}
+
+	moreFollows = index < s.segmentCount-1
+	sequenceNumber := index % 255
+	proposedWindowSize := s.actualWindowSize
+	if index == 0 {
+		proposedWindowSize = s.ssmSAP.GetProposedWindowSize()
+	}
+	serviceChoice := &s.segmentAPDU.serviceChoice
+	offset := uint(index) * s.segmentSize
+	segmentBytes := s.segmentAPDU.serviceBytes[offset : offset+s.segmentSize]
+	if !s.segmentAPDU.isAck {
+		segmentAPDU = readWriteModel.NewAPDUConfirmedRequest(
+			true,
+			moreFollows,
+			s.segmentationSupported == readWriteModel.BACnetSegmentation_SEGMENTED_RECEIVE || s.segmentationSupported == readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+			s.maxSegmentsAccepted,
+			s.maxApduLengthAccepted,
+			s.segmentAPDU.originalInvokeId,
+			&sequenceNumber,
+			&proposedWindowSize,
+			nil,
+			serviceChoice,
+			segmentBytes,
+			0,
+		)
+	} else {
+		segmentAPDU = readWriteModel.NewAPDUComplexAck(
+			true,
+			moreFollows,
+			s.segmentAPDU.originalInvokeId,
+			&sequenceNumber,
+			&proposedWindowSize,
+			nil,
+			serviceChoice,
+			segmentBytes,
+			0,
+		)
+	}
+	return segmentAPDU, moreFollows, nil
+}
+
+// TODO: check that function. looks a bit wonky to just append the payloads like that
+func (s *SSM) appendSegment(apdu readWriteModel.APDU) error {
+	switch apdu := apdu.(type) {
+	case readWriteModel.APDUConfirmedRequestExactly:
+		if apdu.GetSegmentedMessage() || apdu.GetMoreFollows() {
+			return errors.New("Can't handle already segmented message")
+		}
+		bytes, err := apdu.GetServiceRequest().Serialize()
+		if err != nil {
+			return errors.Wrap(err, "Can serialize service request")
+		}
+		s.segmentAPDU.serviceBytes = append(s.segmentAPDU.serviceBytes, bytes...)
+	case readWriteModel.APDUComplexAckExactly:
+		if apdu.GetSegmentedMessage() || apdu.GetMoreFollows() {
+			return errors.New("Can't handle already segmented message")
+		}
+		bytes, err := apdu.GetServiceAck().Serialize()
+		if err != nil {
+			return errors.Wrap(err, "Can serialize service request")
+		}
+		s.segmentAPDU.serviceBytes = append(s.segmentAPDU.serviceBytes, bytes...)
+	default:
+		return errors.Errorf("invalid APDU type %T", apdu)
+	}
+	return nil
+}
+
+func (s *SSM) inWindow(sequenceA, sequenceB uint) bool {
+	return (sequenceA-sequenceB-256)%256 < uint(s.actualWindowSize)
+}
+
+func (s *SSM) fillWindow(sequenceNumber uint8) error {
+	for i := uint8(0); i < s.actualWindowSize; i++ {
+		apdu, moreFollows, err := s.getSegment(sequenceNumber + i)
+		if err != nil {
+			return errors.Wrapf(err, "Error sending out segment %d", i)
+		}
+		s.ssmSAP.Request(apdu)
+		if moreFollows {
+			s.sentAllSegments = true
+		}
+	}
+	return nil
+}
+
+type ClientSSM struct {
+	SSM
+}
+
+func NewClientSSM(sap ServiceAccessPoint, pduAddress []byte) (ClientSSM, error) {
+	ssm, err := NewSSM(sap, pduAddress)
+	if err != nil {
+		return ClientSSM{}, err
+	}
+	// TODO: if deviceEntry is not there get it now...
+	if &ssm.deviceEntry == &NoDeviceEntry {
+		// TODO: get entry for device, store it in inventory
+	}
+	return ClientSSM{
+		SSM: ssm,
+	}, nil
+}
+
+func (s *ClientSSM) setState(newState SSMState, timer *int64) error {
+	// do the regular state change
+	if err := s.SSM.setState(newState, timer); err != nil {
+		return errors.Wrap(err, "error during SSM state transition")
+	}
+
+	if s.state == COMPLETED || s.state == ABORTED {
+		s.ssmSAP.GetClientTransactions() // TODO remove this
+		if &s.deviceEntry != &NoDeviceEntry {
+			// TODO: release device entry
+		}
+	}
+	return nil
+}
+
+func (s *ClientSSM) request(apdu readWriteModel.APDU) {
+	// TODO: ensure apdu has destination, otherwise
+	// TODO: we would need a BVLC to send something or not... maybe the todo above is nonsense, as we are in a connection context
+	s.ssmSAP.Request(apdu)
+}
diff --git a/plc4go/internal/bacnetip/TransactionStateMachine_test.go b/plc4go/internal/bacnetip/TransactionStateMachine_test.go
new file mode 100644
index 0000000000..71944a3517
--- /dev/null
+++ b/plc4go/internal/bacnetip/TransactionStateMachine_test.go
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bacnetip
+
+import (
+	"context"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/bacnetip/readwrite/model"
+	"github.com/apache/plc4x/plc4go/spi"
+	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"reflect"
+	"testing"
+	"time"
+)
+
+func TestTransactionStateMachine_Expect(t1 *testing.T) {
+	type fields struct {
+		MessageCodec          *MessageCodec
+		deviceInventory       *DeviceInventory
+		retryCount            int
+		segmentRetryCount     int
+		duplicateCount        int
+		sentAllSegments       bool
+		lastSequenceNumber    int
+		initialSequenceNumber int
+		actualWindowSize      int
+		proposeWindowSize     int
+		segmentTimer          int
+		RequestTimer          int
+	}
+	type args struct {
+		ctx            context.Context
+		acceptsMessage spi.AcceptsMessage
+		handleMessage  spi.HandleMessage
+		handleError    spi.HandleError
+		ttl            time.Duration
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr bool
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &TransactionStateMachine{
+				MessageCodec:          tt.fields.MessageCodec,
+				deviceInventory:       tt.fields.deviceInventory,
+				retryCount:            tt.fields.retryCount,
+				segmentRetryCount:     tt.fields.segmentRetryCount,
+				duplicateCount:        tt.fields.duplicateCount,
+				sentAllSegments:       tt.fields.sentAllSegments,
+				lastSequenceNumber:    tt.fields.lastSequenceNumber,
+				initialSequenceNumber: tt.fields.initialSequenceNumber,
+				actualWindowSize:      tt.fields.actualWindowSize,
+				proposeWindowSize:     tt.fields.proposeWindowSize,
+				segmentTimer:          tt.fields.segmentTimer,
+				RequestTimer:          tt.fields.RequestTimer,
+			}
+			if err := t.Expect(tt.args.ctx, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError, tt.args.ttl); (err != nil) != tt.wantErr {
+				t1.Errorf("Expect() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestTransactionStateMachine_GetCodec(t1 *testing.T) {
+	type fields struct {
+		MessageCodec          *MessageCodec
+		deviceInventory       *DeviceInventory
+		retryCount            int
+		segmentRetryCount     int
+		duplicateCount        int
+		sentAllSegments       bool
+		lastSequenceNumber    int
+		initialSequenceNumber int
+		actualWindowSize      int
+		proposeWindowSize     int
+		segmentTimer          int
+		RequestTimer          int
+	}
+	tests := []struct {
+		name   string
+		fields fields
+		want   spi.MessageCodec
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &TransactionStateMachine{
+				MessageCodec:          tt.fields.MessageCodec,
+				deviceInventory:       tt.fields.deviceInventory,
+				retryCount:            tt.fields.retryCount,
+				segmentRetryCount:     tt.fields.segmentRetryCount,
+				duplicateCount:        tt.fields.duplicateCount,
+				sentAllSegments:       tt.fields.sentAllSegments,
+				lastSequenceNumber:    tt.fields.lastSequenceNumber,
+				initialSequenceNumber: tt.fields.initialSequenceNumber,
+				actualWindowSize:      tt.fields.actualWindowSize,
+				proposeWindowSize:     tt.fields.proposeWindowSize,
+				segmentTimer:          tt.fields.segmentTimer,
+				RequestTimer:          tt.fields.RequestTimer,
+			}
+			if got := t.GetCodec(); !reflect.DeepEqual(got, tt.want) {
+				t1.Errorf("GetCodec() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestTransactionStateMachine_Send(t1 *testing.T) {
+	type fields struct {
+		MessageCodec          *MessageCodec
+		deviceInventory       *DeviceInventory
+		retryCount            int
+		segmentRetryCount     int
+		duplicateCount        int
+		sentAllSegments       bool
+		lastSequenceNumber    int
+		initialSequenceNumber int
+		actualWindowSize      int
+		proposeWindowSize     int
+		segmentTimer          int
+		RequestTimer          int
+	}
+	type args struct {
+		message spi.Message
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr bool
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &TransactionStateMachine{
+				MessageCodec:          tt.fields.MessageCodec,
+				deviceInventory:       tt.fields.deviceInventory,
+				retryCount:            tt.fields.retryCount,
+				segmentRetryCount:     tt.fields.segmentRetryCount,
+				duplicateCount:        tt.fields.duplicateCount,
+				sentAllSegments:       tt.fields.sentAllSegments,
+				lastSequenceNumber:    tt.fields.lastSequenceNumber,
+				initialSequenceNumber: tt.fields.initialSequenceNumber,
+				actualWindowSize:      tt.fields.actualWindowSize,
+				proposeWindowSize:     tt.fields.proposeWindowSize,
+				segmentTimer:          tt.fields.segmentTimer,
+				RequestTimer:          tt.fields.RequestTimer,
+			}
+			if err := t.Send(tt.args.message); (err != nil) != tt.wantErr {
+				t1.Errorf("Send() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestTransactionStateMachine_SendRequest(t1 *testing.T) {
+	type fields struct {
+		MessageCodec          *MessageCodec
+		deviceInventory       *DeviceInventory
+		retryCount            int
+		segmentRetryCount     int
+		duplicateCount        int
+		sentAllSegments       bool
+		lastSequenceNumber    int
+		initialSequenceNumber int
+		actualWindowSize      int
+		proposeWindowSize     int
+		segmentTimer          int
+		RequestTimer          int
+	}
+	type args struct {
+		ctx            context.Context
+		message        spi.Message
+		acceptsMessage spi.AcceptsMessage
+		handleMessage  spi.HandleMessage
+		handleError    spi.HandleError
+		ttl            time.Duration
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr bool
+	}{
+		// TODO: Add test cases.
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &TransactionStateMachine{
+				MessageCodec:          tt.fields.MessageCodec,
+				deviceInventory:       tt.fields.deviceInventory,
+				retryCount:            tt.fields.retryCount,
+				segmentRetryCount:     tt.fields.segmentRetryCount,
+				duplicateCount:        tt.fields.duplicateCount,
+				sentAllSegments:       tt.fields.sentAllSegments,
+				lastSequenceNumber:    tt.fields.lastSequenceNumber,
+				initialSequenceNumber: tt.fields.initialSequenceNumber,
+				actualWindowSize:      tt.fields.actualWindowSize,
+				proposeWindowSize:     tt.fields.proposeWindowSize,
+				segmentTimer:          tt.fields.segmentTimer,
+				RequestTimer:          tt.fields.RequestTimer,
+			}
+			if err := t.SendRequest(tt.args.ctx, tt.args.message, tt.args.acceptsMessage, tt.args.handleMessage, tt.args.handleError, tt.args.ttl); (err != nil) != tt.wantErr {
+				t1.Errorf("SendRequest() error = %v, wantErr %v", err, tt.wantErr)
+			}
+		})
+	}
+}
+
+func TestTransactionStateMachine_handleOutboundMessage(t1 *testing.T) {
+	type fields struct {
+		MessageCodec          *MessageCodec
+		deviceInventory       *DeviceInventory
+		retryCount            int
+		segmentRetryCount     int
+		duplicateCount        int
+		sentAllSegments       bool
+		lastSequenceNumber    int
+		initialSequenceNumber int
+		actualWindowSize      int
+		proposeWindowSize     int
+		segmentTimer          int
+		RequestTimer          int
+	}
+	type args struct {
+		message spi.Message
+	}
+	tests := []struct {
+		name        string
+		fields      fields
+		args        args
+		wantHandled bool
+		wantErr     bool
+	}{
+		{
+			name: "message not relevant",
+		},
+		{
+			name: "Normal unsgemented message",
+			fields: fields{
+				MessageCodec:          NewMessageCodec(test.NewTransportInstance(test.NewTransport())),
+				deviceInventory:       nil,
+				retryCount:            0,
+				segmentRetryCount:     0,
+				duplicateCount:        0,
+				sentAllSegments:       false,
+				lastSequenceNumber:    0,
+				initialSequenceNumber: 0,
+				actualWindowSize:      0,
+				proposeWindowSize:     0,
+				segmentTimer:          0,
+				RequestTimer:          0,
+			},
+			args: args{
+				message: readWriteModel.NewBVLCOriginalUnicastNPDU(
+					readWriteModel.NewNPDU(
+						1,
+						readWriteModel.NewNPDUControl(false, false, false, false, readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE),
+						nil,
+						nil,
+						nil,
+						nil,
+						nil,
+						nil,
+						nil,
+						nil,
+						readWriteModel.NewAPDUComplexAck(
+							false,
+							false,
+							13,
+							nil,
+							nil,
+							readWriteModel.NewBACnetServiceAckReadProperty(
+								readWriteModel.CreateBACnetContextTagObjectIdentifier(0, 2, 1),
+								readWriteModel.CreateBACnetPropertyIdentifierTagged(1, 85),
+								nil,
+								readWriteModel.NewBACnetConstructedDataAnalogValuePresentValue(
+									readWriteModel.CreateBACnetApplicationTagReal(101),
+									readWriteModel.CreateBACnetOpeningTag(3),
+									readWriteModel.CreateBACnetTagHeaderBalanced(true, 3, 3),
+									readWriteModel.CreateBACnetClosingTag(3),
+									3,
+									nil,
+								),
+								0,
+							),
+							nil,
+							nil,
+							0,
+						),
+						0,
+					),
+					0,
+				),
+			},
+		},
+		{
+			name: "Normal segmented message",
+			fields: fields{
+				MessageCodec: NewMessageCodec(test.NewTransportInstance(test.NewTransport())),
+				deviceInventory: func() *DeviceInventory {
+					var deviceInventory = DeviceInventory{
+						devices: map[string]DeviceEntry{
+							"123": {
+								DeviceIdentifier:          nil,
+								MaximumApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_206,
+								SegmentationSupported:     readWriteModel.BACnetSegmentation_SEGMENTED_BOTH,
+								VendorId:                  0,
+								DeviceObjects:             nil,
+							},
+						},
+					}
+					return &deviceInventory
+				}(),
+				retryCount:            0,
+				segmentRetryCount:     0,
+				duplicateCount:        0,
+				sentAllSegments:       false,
+				lastSequenceNumber:    0,
+				initialSequenceNumber: 0,
+				actualWindowSize:      0,
+				proposeWindowSize:     0,
+				segmentTimer:          0,
+				RequestTimer:          0,
+			},
+			args: args{
+				message: readWriteModel.NewBVLCOriginalUnicastNPDU(
+					readWriteModel.NewNPDU(
+						1,
+						readWriteModel.NewNPDUControl(false, true, false, false, readWriteModel.NPDUNetworkPriority_NORMAL_MESSAGE),
+						nil,
+						func() *uint8 {
+							var elements uint8 = 3
+							return &elements
+						}(),
+						[]uint8{0x31, 0x32, 0x33},
+						nil,
+						nil,
+						nil,
+						nil,
+						nil,
+						readWriteModel.NewAPDUComplexAck(
+							false,
+							false,
+							13,
+							nil,
+							nil,
+							readWriteModel.NewBACnetServiceAckReadProperty(
+								readWriteModel.CreateBACnetContextTagObjectIdentifier(0, 2, 1),
+								readWriteModel.CreateBACnetPropertyIdentifierTagged(1, 85),
+								nil,
+								readWriteModel.NewBACnetConstructedDataActionText(
+									readWriteModel.CreateBACnetApplicationTagUnsignedInteger(100),
+									func() []readWriteModel.BACnetApplicationTagCharacterString {
+										var characterStrings []readWriteModel.BACnetApplicationTagCharacterString
+										for i := 0; i < 100; i++ {
+											characterStrings = append(characterStrings, readWriteModel.CreateBACnetApplicationTagCharacterString(readWriteModel.BACnetCharacterEncoding_ISO_10646, "ALAAARM!!"))
+										}
+										return characterStrings
+									}(),
+									readWriteModel.CreateBACnetOpeningTag(3),
+									readWriteModel.CreateBACnetTagHeaderBalanced(true, 3, 3),
+									readWriteModel.CreateBACnetClosingTag(3),
+									3,
+									nil,
+								),
+								0,
+							),
+							nil,
+							nil,
+							0,
+						),
+						0,
+					),
+					0,
+				),
+			},
+		},
+	}
+	for _, tt := range tests {
+		t1.Run(tt.name, func(t1 *testing.T) {
+			t := &TransactionStateMachine{
+				MessageCodec:          tt.fields.MessageCodec,
+				deviceInventory:       tt.fields.deviceInventory,
+				retryCount:            tt.fields.retryCount,
+				segmentRetryCount:     tt.fields.segmentRetryCount,
+				duplicateCount:        tt.fields.duplicateCount,
+				sentAllSegments:       tt.fields.sentAllSegments,
+				lastSequenceNumber:    tt.fields.lastSequenceNumber,
+				initialSequenceNumber: tt.fields.initialSequenceNumber,
+				actualWindowSize:      tt.fields.actualWindowSize,
+				proposeWindowSize:     tt.fields.proposeWindowSize,
+				segmentTimer:          tt.fields.segmentTimer,
+				RequestTimer:          tt.fields.RequestTimer,
+			}
+			gotHandled, err := t.handleOutboundMessage(tt.args.message)
+			if (err != nil) != tt.wantErr {
+				t1.Errorf("handleOutboundMessage() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if gotHandled != tt.wantHandled {
+				t1.Errorf("handleOutboundMessage() gotHandled = %v, want %v", gotHandled, tt.wantHandled)
+			}
+		})
+	}
+}