You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/15 10:55:43 UTC

[plc4x] branch develop updated: feat(plc4go/spi): integrate ctx into DefaultCodec

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new aa93c2784 feat(plc4go/spi): integrate ctx into DefaultCodec
aa93c2784 is described below

commit aa93c27845fc415ec7a73af2c6950f41a673f88c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 15 12:55:35 2022 +0200

    feat(plc4go/spi): integrate ctx into DefaultCodec
---
 plc4go/internal/ads/Reader.go                      |   79 +-
 plc4go/internal/ads/Writer.go                      |   55 +-
 plc4go/internal/bacnetip/Reader.go                 |   95 +-
 plc4go/internal/cbus/Connection.go                 |  149 ++-
 plc4go/internal/cbus/Reader.go                     |  255 ++---
 plc4go/internal/eip/Connection.go                  |   93 +-
 plc4go/internal/eip/Reader.go                      |  180 ++-
 plc4go/internal/eip/Writer.go                      |  180 ++-
 plc4go/internal/knxnetip/Browser.go                |   15 +-
 plc4go/internal/knxnetip/Connection.go             |   19 +-
 .../knxnetip/ConnectionDriverSpecificOperations.go |   41 +-
 plc4go/internal/knxnetip/ConnectionHelper.go       |    7 +-
 .../knxnetip/ConnectionInternalOperations.go       | 1189 +++++++++-----------
 plc4go/internal/knxnetip/Reader.go                 |   10 +-
 plc4go/internal/modbus/Connection.go               |   46 +-
 plc4go/internal/modbus/Reader.go                   |   59 +-
 plc4go/internal/modbus/Writer.go                   |   57 +-
 plc4go/internal/s7/Connection.go                   |  191 ++--
 plc4go/internal/s7/Reader.go                       |   81 +-
 plc4go/internal/s7/Writer.go                       |   83 +-
 plc4go/spi/MessageCodec.go                         |    8 +-
 plc4go/spi/default/DefaultCodec.go                 |   33 +-
 22 files changed, 1398 insertions(+), 1527 deletions(-)

diff --git a/plc4go/internal/ads/Reader.go b/plc4go/internal/ads/Reader.go
index a574b948c..bc0c0e652 100644
--- a/plc4go/internal/ads/Reader.go
+++ b/plc4go/internal/ads/Reader.go
@@ -64,15 +64,15 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 	result := make(chan model.PlcReadRequestResult)
 	go func() {
 		if len(readRequest.GetFieldNames()) <= 1 {
-			m.singleRead(readRequest, result)
+			m.singleRead(ctx, readRequest, result)
 		} else {
-			m.multiRead(readRequest, result)
+			m.multiRead(ctx, readRequest, result)
 		}
 	}()
 	return result
 }
 
-func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) singleRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
 	if len(readRequest.GetFieldNames()) != 1 {
 		result <- &plc4goModel.DefaultPlcReadRequestResult{
 			Request:  readRequest,
@@ -96,7 +96,7 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
 			log.Debug().Msgf("Invalid field item type %T", field)
 			return
 		}
-		field, err = m.resolveField(adsField)
+		field, err = m.resolveField(ctx, adsField)
 		if err != nil {
 			result <- &plc4goModel.DefaultPlcReadRequestResult{
 				Request:  readRequest,
@@ -150,10 +150,10 @@ func (m *Reader) singleRead(readRequest model.PlcReadRequest, result chan model.
 	}
 	userdata.Data = readWriteModel.NewAdsReadRequest(adsField.IndexGroup, adsField.IndexOffset, readLength)
 
-	m.sendOverTheWire(userdata, readRequest, result)
+	m.sendOverTheWire(ctx, userdata, readRequest, result)
 }
 
-func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) multiRead(ctx context.Context, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
 	// Calculate the size of all fields together.
 	// Calculate the expected size of the response data.
 	expectedResponseDataSize := uint32(0)
@@ -216,7 +216,7 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
 				log.Debug().Msgf("Invalid field item type %T", field)
 				return
 			}
-			field, err = m.resolveField(adsField)
+			field, err = m.resolveField(ctx, adsField)
 			if err != nil {
 				result <- &plc4goModel.DefaultPlcReadRequestResult{
 					Request:  readRequest,
@@ -242,10 +242,10 @@ func (m *Reader) multiRead(readRequest model.PlcReadRequest, result chan model.P
 	}
 	userdata.Data = readWriteModel.NewAdsReadWriteRequest(uint32(readWriteModel.ReservedIndexGroups_ADSIGRP_MULTIPLE_READ), uint32(len(readRequest.GetFieldNames())), expectedResponseDataSize, items, nil)
 
-	m.sendOverTheWire(userdata, readRequest, result)
+	m.sendOverTheWire(ctx, userdata, readRequest, result)
 }
 
-func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
+func (m *Reader) sendOverTheWire(ctx context.Context, userdata readWriteModel.AmsPacket, readRequest model.PlcReadRequest, result chan model.PlcReadRequestResult) {
 	// Calculate a new transaction identifier
 	transactionIdentifier := atomic.AddUint32(&m.transactionIdentifier, 1)
 	if transactionIdentifier > math.MaxUint8 {
@@ -271,42 +271,37 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest
 
 	// Send the TCP Paket over the wire
 	log.Trace().Msg("Send TCP Paket")
-	if err := m.messageCodec.SendRequest(
-		amsTcpPaket,
-		func(message spi.Message) bool {
-			paket := message.(readWriteModel.AmsTCPPacket)
-			return paket.GetUserdata().GetInvokeId() == transactionIdentifier
-		},
-		func(message spi.Message) error {
-			// Convert the response into an amsTcpPaket
-			log.Trace().Msg("convert response to amsTcpPaket")
-			receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
-			// Convert the ads response into a PLC4X response
-			log.Trace().Msg("convert response to PLC4X response")
-			readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)
+	if err := m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
+		paket := message.(readWriteModel.AmsTCPPacket)
+		return paket.GetUserdata().GetInvokeId() == transactionIdentifier
+	}, func(message spi.Message) error {
+		// Convert the response into an amsTcpPaket
+		log.Trace().Msg("convert response to amsTcpPaket")
+		receivedAmsTcpPaket := message.(readWriteModel.AmsTCPPacket)
+		// Convert the ads response into a PLC4X response
+		log.Trace().Msg("convert response to PLC4X response")
+		readResponse, err := m.ToPlc4xReadResponse(receivedAmsTcpPaket, readRequest)
 
-			if err != nil {
-				result <- &plc4goModel.DefaultPlcReadRequestResult{
-					Request: readRequest,
-					Err:     errors.Wrap(err, "Error decoding response"),
-				}
-				// TODO: should we return the error here?
-				return nil
-			}
-			result <- &plc4goModel.DefaultPlcReadRequestResult{
-				Request:  readRequest,
-				Response: readResponse,
-			}
-			return nil
-		},
-		func(err error) error {
+		if err != nil {
 			result <- &plc4goModel.DefaultPlcReadRequestResult{
 				Request: readRequest,
-				Err:     errors.Wrap(err, "got timeout while waiting for response"),
+				Err:     errors.Wrap(err, "Error decoding response"),
 			}
+			// TODO: should we return the error here?
 			return nil
-		},
-		time.Second*1); err != nil {
+		}
+		result <- &plc4goModel.DefaultPlcReadRequestResult{
+			Request:  readRequest,
+			Response: readResponse,
+		}
+		return nil
+	}, func(err error) error {
+		result <- &plc4goModel.DefaultPlcReadRequestResult{
+			Request: readRequest,
+			Err:     errors.Wrap(err, "got timeout while waiting for response"),
+		}
+		return nil
+	}, time.Second*1); err != nil {
 		result <- &plc4goModel.DefaultPlcReadRequestResult{
 			Request:  readRequest,
 			Response: nil,
@@ -315,7 +310,7 @@ func (m *Reader) sendOverTheWire(userdata readWriteModel.AmsPacket, readRequest
 	}
 }
 
-func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, error) {
+func (m *Reader) resolveField(ctx context.Context, symbolicField SymbolicPlcField) (DirectPlcField, error) {
 	if directPlcField, ok := m.fieldMapping[symbolicField]; ok {
 		return directPlcField, nil
 	}
@@ -346,7 +341,7 @@ func (m *Reader) resolveField(symbolicField SymbolicPlcField) (DirectPlcField, e
 	result := make(chan model.PlcReadRequestResult)
 	go func() {
 		dummyRequest := plc4goModel.NewDefaultPlcReadRequest(map[string]model.PlcField{"dummy": DirectPlcField{PlcField: PlcField{Datatype: readWriteModel.AdsDataType_UINT32}}}, []string{"dummy"}, nil, nil)
-		m.sendOverTheWire(userdata, dummyRequest, result)
+		m.sendOverTheWire(ctx, userdata, dummyRequest, result)
 	}()
 	// We wait synchronous for the resolution response before we can continue
 	response := <-result
diff --git a/plc4go/internal/ads/Writer.go b/plc4go/internal/ads/Writer.go
index fa41fbdae..a6f5ba200 100644
--- a/plc4go/internal/ads/Writer.go
+++ b/plc4go/internal/ads/Writer.go
@@ -83,7 +83,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
 				log.Debug().Msgf("Invalid field item type %T", field)
 				return
 			}
-			field, err = m.reader.resolveField(adsField)
+			field, err = m.reader.resolveField(ctx, adsField)
 			if err != nil {
 				result <- &plc4goModel.DefaultPlcWriteRequestResult{
 					Request:  writeRequest,
@@ -159,39 +159,34 @@ func (m *Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest)
 		amsTcpPaket := readWriteModel.NewAmsTCPPacket(userdata)
 
 		// Send the TCP Paket over the wire
-		err = m.messageCodec.SendRequest(
-			amsTcpPaket,
-			func(message spi.Message) bool {
-				paket := readWriteModel.CastAmsTCPPacket(message)
-				return paket.GetUserdata().GetInvokeId() == transactionIdentifier
-			},
-			func(message spi.Message) error {
-				// Convert the response into an responseAmsTcpPaket
-				responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
-				// Convert the ads response into a PLC4X response
-				readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)
+		err = m.messageCodec.SendRequest(ctx, amsTcpPaket, func(message spi.Message) bool {
+			paket := readWriteModel.CastAmsTCPPacket(message)
+			return paket.GetUserdata().GetInvokeId() == transactionIdentifier
+		}, func(message spi.Message) error {
+			// Convert the response into an responseAmsTcpPaket
+			responseAmsTcpPaket := readWriteModel.CastAmsTCPPacket(message)
+			// Convert the ads response into a PLC4X response
+			readResponse, err := m.ToPlc4xWriteResponse(amsTcpPaket, responseAmsTcpPaket, writeRequest)
 
-				if err != nil {
-					result <- &plc4goModel.DefaultPlcWriteRequestResult{
-						Request: writeRequest,
-						Err:     errors.Wrap(err, "Error decoding response"),
-					}
-				} else {
-					result <- &plc4goModel.DefaultPlcWriteRequestResult{
-						Request:  writeRequest,
-						Response: readResponse,
-					}
-				}
-				return nil
-			},
-			func(err error) error {
+			if err != nil {
 				result <- &plc4goModel.DefaultPlcWriteRequestResult{
 					Request: writeRequest,
-					Err:     errors.New("got timeout while waiting for response"),
+					Err:     errors.Wrap(err, "Error decoding response"),
+				}
+			} else {
+				result <- &plc4goModel.DefaultPlcWriteRequestResult{
+					Request:  writeRequest,
+					Response: readResponse,
 				}
-				return nil
-			},
-			time.Second*1)
+			}
+			return nil
+		}, func(err error) error {
+			result <- &plc4goModel.DefaultPlcWriteRequestResult{
+				Request: writeRequest,
+				Err:     errors.New("got timeout while waiting for response"),
+			}
+			return nil
+		}, time.Second*1)
 	}()
 	return result
 }
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index 13e3c17cc..a950cccb5 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -188,63 +188,58 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
-			if err := m.messageCodec.SendRequest(
-				bvlc,
-				func(message spi.Message) bool {
-					bvlc, ok := message.(readWriteModel.BVLC)
-					if !ok {
-						log.Debug().Msgf("Received strange type %T", bvlc)
-						return false
-					}
-					var npdu readWriteModel.NPDU
-					if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
-						npdu = npduRetriever.GetNpdu()
-					} else {
-						log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
-						return false
-					}
-					if npdu.GetControl().GetMessageTypeFieldPresent() {
-						return false
-					}
-					if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
-						log.Debug().Err(err).Msg("Error getting invoke id")
-						return false
-					} else {
-						return invokeIdFromApdu == invokeId
-					}
-				},
-				func(message spi.Message) error {
-					// Convert the response into an
-					log.Trace().Msg("convert response to ")
-					apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()
+			if err := m.messageCodec.SendRequest(ctx, bvlc, func(message spi.Message) bool {
+				bvlc, ok := message.(readWriteModel.BVLC)
+				if !ok {
+					log.Debug().Msgf("Received strange type %T", bvlc)
+					return false
+				}
+				var npdu readWriteModel.NPDU
+				if npduRetriever, ok := bvlc.(interface{ GetNpdu() readWriteModel.NPDU }); ok {
+					npdu = npduRetriever.GetNpdu()
+				} else {
+					log.Debug().Msgf("bvlc has no way to give a npdu %T", bvlc)
+					return false
+				}
+				if npdu.GetControl().GetMessageTypeFieldPresent() {
+					return false
+				}
+				if invokeIdFromApdu, err := getInvokeIdFromApdu(npdu.GetApdu()); err != nil {
+					log.Debug().Err(err).Msg("Error getting invoke id")
+					return false
+				} else {
+					return invokeIdFromApdu == invokeId
+				}
+			}, func(message spi.Message) error {
+				// Convert the response into an
+				log.Trace().Msg("convert response to ")
+				apdu := message.(readWriteModel.BVLC).(interface{ GetNpdu() readWriteModel.NPDU }).GetNpdu().GetApdu()
 
-					// TODO: implement segment handling
+				// TODO: implement segment handling
 
-					// Convert the bacnet response into a PLC4X response
-					log.Trace().Msg("convert response to PLC4X response")
-					readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)
+				// Convert the bacnet response into a PLC4X response
+				log.Trace().Msg("convert response to PLC4X response")
+				readResponse, err := m.ToPlc4xReadResponse(apdu, readRequest)
 
-					if err != nil {
-						result <- &spiModel.DefaultPlcReadRequestResult{
-							Request: readRequest,
-							Err:     errors.Wrap(err, "Error decoding response"),
-						}
-						return transaction.EndRequest()
-					}
-					result <- &spiModel.DefaultPlcReadRequestResult{
-						Request:  readRequest,
-						Response: readResponse,
-					}
-					return transaction.EndRequest()
-				},
-				func(err error) error {
+				if err != nil {
 					result <- &spiModel.DefaultPlcReadRequestResult{
 						Request: readRequest,
-						Err:     errors.Wrap(err, "got timeout while waiting for response"),
+						Err:     errors.Wrap(err, "Error decoding response"),
 					}
 					return transaction.EndRequest()
-				},
-				time.Second*1); err != nil {
+				}
+				result <- &spiModel.DefaultPlcReadRequestResult{
+					Request:  readRequest,
+					Response: readResponse,
+				}
+				return transaction.EndRequest()
+			}, func(err error) error {
+				result <- &spiModel.DefaultPlcReadRequestResult{
+					Request: readRequest,
+					Err:     errors.Wrap(err, "got timeout while waiting for response"),
+				}
+				return transaction.EndRequest()
+			}, time.Second*1); err != nil {
 				result <- &spiModel.DefaultPlcReadRequestResult{
 					Request:  readRequest,
 					Response: nil,
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index f341de773..16b245c33 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -20,6 +20,7 @@
 package cbus
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -106,6 +107,8 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	log.Trace().Msg("Connecting")
 	ch := make(chan plc4go.PlcConnectionConnectResult)
 	go func() {
@@ -116,7 +119,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 
 		// For testing purposes we can skip the waiting for a complete connection
 		if !c.driverContext.awaitSetupComplete {
-			go c.setupConnection(ch)
+			go c.setupConnection(ctx, ch)
 			log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
 			// Here we write directly and don't wait till the connection is "really" connected
 			// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -125,7 +128,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 			return
 		}
 
-		c.setupConnection(ch)
+		c.setupConnection(ctx, ch)
 	}()
 	return ch
 }
@@ -174,7 +177,7 @@ func (c *Connection) String() string {
 	return fmt.Sprintf("cbus.Connection")
 }
 
-func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
 	cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
 	requestContext := &c.messageCodec.(*MessageCodec).requestContext
 
@@ -186,31 +189,25 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 
 		receivedResetEchoChan := make(chan bool)
 		receivedResetEchoErrorChan := make(chan error)
-		if err := c.messageCodec.SendRequest(
-			cBusMessage,
-			func(message spi.Message) bool {
-				cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
-				if !ok {
-					return false
-				}
-				_, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
-				return ok
-			},
-			func(message spi.Message) error {
-				receivedResetEchoChan <- true
-				return nil
-			},
-			func(err error) error {
-				// If this is a timeout, do a check if the connection requires a reconnection
-				if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-					log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-					c.Close()
-				}
-				receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
-				return nil
-			},
-			c.GetTtl(),
-		); err != nil {
+		if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
+			cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
+			if !ok {
+				return false
+			}
+			_, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
+			return ok
+		}, func(message spi.Message) error {
+			receivedResetEchoChan <- true
+			return nil
+		}, func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+				c.Close()
+			}
+			receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
+			return nil
+		}, c.GetTtl()); err != nil {
 			c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
 			return
 		}
@@ -231,11 +228,11 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	{
 		log.Debug().Msg("Set application filter to all")
 		applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(c.configuration.MonitoredApplication1), nil, 1)
-		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
+		if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_1, applicationAddress1, requestContext, cbusOptions) {
 			return
 		}
 		applicationAddress2 := readWriteModel.NewParameterValueApplicationAddress2(readWriteModel.NewApplicationAddress2(c.configuration.MonitoredApplication2), nil, 1)
-		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
+		if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_APPLICATION_ADDRESS_2, applicationAddress2, requestContext, cbusOptions) {
 			return
 		}
 		log.Debug().Msg("Application filter set")
@@ -243,7 +240,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	{
 		log.Debug().Msg("Set interface options 3")
 		interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(c.configuration.Exstat, c.configuration.Pun, c.configuration.LocalSal, c.configuration.Pcn), nil, 1)
-		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
+		if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_3, interfaceOptions3, requestContext, cbusOptions) {
 			return
 		}
 		// TODO: add localsal to the options
@@ -253,7 +250,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	{
 		log.Debug().Msg("Set interface options 1 power up settings")
 		interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect)), 1)
-		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
+		if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, interfaceOptions1PowerUpSettings, requestContext, cbusOptions) {
 			return
 		}
 		// TODO: what is with monall
@@ -263,7 +260,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 	{
 		log.Debug().Msg("Set interface options 1")
 		interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(c.configuration.Idmon, c.configuration.Monitor, c.configuration.Smart, c.configuration.Srchk, c.configuration.XonXoff, c.configuration.Connect), nil, 1)
-		if !c.sendCalDataWrite(ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
+		if !c.sendCalDataWrite(ctx, ch, readWriteModel.Parameter_INTERFACE_OPTIONS_1, interfaceOptions1, requestContext, cbusOptions) {
 			return
 		}
 		// TODO: what is with monall
@@ -319,7 +316,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 }
 
 // This is used for connection setup
-func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
+func (c *Connection) sendCalDataWrite(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
 	// TODO: we assume that is always a one byte request otherwise we need to map the length here
 	calData := readWriteModel.NewCALDataWrite(paramNo, 0x0, parameterValue, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
 	directCommand := readWriteModel.NewRequestDirectCommandAccess(calData /*we don't want a alpha otherwise the PCI will auto-switch*/, nil, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
@@ -327,61 +324,55 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
 
 	directCommandAckChan := make(chan bool)
 	directCommandAckErrorChan := make(chan error)
-	if err := c.messageCodec.SendRequest(
-		cBusMessage,
-		func(message spi.Message) bool {
-			switch message := message.(type) {
-			case readWriteModel.CBusMessageToClientExactly:
-				switch reply := message.GetReply().(type) {
-				case readWriteModel.ReplyOrConfirmationReplyExactly:
-					switch reply := reply.GetReply().(type) {
-					case readWriteModel.ReplyEncodedReplyExactly:
-						switch encodedReply := reply.GetEncodedReply().(type) {
-						case readWriteModel.EncodedReplyCALReplyExactly:
-							switch data := encodedReply.GetCalReply().GetCalData().(type) {
-							case readWriteModel.CALDataAcknowledgeExactly:
-								if data.GetParamNo() == paramNo {
-									return true
-								}
+	if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
+		switch message := message.(type) {
+		case readWriteModel.CBusMessageToClientExactly:
+			switch reply := message.GetReply().(type) {
+			case readWriteModel.ReplyOrConfirmationReplyExactly:
+				switch reply := reply.GetReply().(type) {
+				case readWriteModel.ReplyEncodedReplyExactly:
+					switch encodedReply := reply.GetEncodedReply().(type) {
+					case readWriteModel.EncodedReplyCALReplyExactly:
+						switch data := encodedReply.GetCalReply().GetCalData().(type) {
+						case readWriteModel.CALDataAcknowledgeExactly:
+							if data.GetParamNo() == paramNo {
+								return true
 							}
 						}
 					}
 				}
 			}
-			return false
-		},
-		func(message spi.Message) error {
-			switch message := message.(type) {
-			case readWriteModel.CBusMessageToClientExactly:
-				switch reply := message.GetReply().(type) {
-				case readWriteModel.ReplyOrConfirmationReplyExactly:
-					switch reply := reply.GetReply().(type) {
-					case readWriteModel.ReplyEncodedReplyExactly:
-						switch encodedReply := reply.GetEncodedReply().(type) {
-						case readWriteModel.EncodedReplyCALReplyExactly:
-							switch data := encodedReply.GetCalReply().GetCalData().(type) {
-							case readWriteModel.CALDataAcknowledgeExactly:
-								if data.GetParamNo() == paramNo {
-									directCommandAckChan <- true
-								}
+		}
+		return false
+	}, func(message spi.Message) error {
+		switch message := message.(type) {
+		case readWriteModel.CBusMessageToClientExactly:
+			switch reply := message.GetReply().(type) {
+			case readWriteModel.ReplyOrConfirmationReplyExactly:
+				switch reply := reply.GetReply().(type) {
+				case readWriteModel.ReplyEncodedReplyExactly:
+					switch encodedReply := reply.GetEncodedReply().(type) {
+					case readWriteModel.EncodedReplyCALReplyExactly:
+						switch data := encodedReply.GetCalReply().GetCalData().(type) {
+						case readWriteModel.CALDataAcknowledgeExactly:
+							if data.GetParamNo() == paramNo {
+								directCommandAckChan <- true
 							}
 						}
 					}
 				}
 			}
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-				c.Close()
-			}
-			directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		c.GetTtl(),
-	); err != nil {
+		}
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+			c.Close()
+		}
+		directCommandAckErrorChan <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, c.GetTtl()); err != nil {
 		c.fireConnectionError(errors.Wrap(err, "Error during sending of write request"), ch)
 		return false
 	}
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index cd5840f93..59f4f1511 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -101,84 +101,105 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 
 				// Send the  over the wire
 				log.Trace().Msg("Send ")
-				if err := m.messageCodec.SendRequest(
-					messageToSend,
-					func(receivedMessage spi.Message) bool {
-						cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-						if !ok {
-							return false
-						}
-						messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-						if !ok {
-							return false
-						}
-						// Check if this errored
-						if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-							// This means we must handle this below
-							return true
-						}
+				if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+					cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+					if !ok {
+						return false
+					}
+					messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+					if !ok {
+						return false
+					}
+					// Check if this errored
+					if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+						// This means we must handle this below
+						return true
+					}
 
-						confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-						if !ok {
-							return false
-						}
-						return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
-					},
-					func(receivedMessage spi.Message) error {
-						// Convert the response into an
-						log.Trace().Msg("convert response to ")
-						cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-						messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
-						if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-							log.Trace().Msg("We got a server failure")
-							addResponseCode(fieldNameCopy, model.PlcResponseCode_INVALID_DATA)
-							requestWasOk <- false
-							return transaction.EndRequest()
-						}
-						replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-						if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-							var responseCode model.PlcResponseCode
-							switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-							case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-								responseCode = model.PlcResponseCode_REMOTE_ERROR
-							case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-								responseCode = model.PlcResponseCode_INVALID_DATA
-							case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-								responseCode = model.PlcResponseCode_REMOTE_BUSY
-							case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-								responseCode = model.PlcResponseCode_INVALID_DATA
-							default:
-								panic("Every code should be mapped here")
-							}
-							log.Trace().Msgf("Was no success %s:%v", fieldNameCopy, responseCode)
-							addResponseCode(fieldNameCopy, responseCode)
-							requestWasOk <- true
-							return transaction.EndRequest()
+					confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+					if !ok {
+						return false
+					}
+					return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+				}, func(receivedMessage spi.Message) error {
+					// Convert the response into an
+					log.Trace().Msg("convert response to ")
+					cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+					messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+					if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+						log.Trace().Msg("We got a server failure")
+						addResponseCode(fieldNameCopy, model.PlcResponseCode_INVALID_DATA)
+						requestWasOk <- false
+						return transaction.EndRequest()
+					}
+					replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+					if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+						var responseCode model.PlcResponseCode
+						switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+						case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+							responseCode = model.PlcResponseCode_REMOTE_ERROR
+						case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+							responseCode = model.PlcResponseCode_INVALID_DATA
+						case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+							responseCode = model.PlcResponseCode_REMOTE_BUSY
+						case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+							responseCode = model.PlcResponseCode_INVALID_DATA
+						default:
+							panic("Every code should be mapped here")
 						}
+						log.Trace().Msgf("Was no success %s:%v", fieldNameCopy, responseCode)
+						addResponseCode(fieldNameCopy, responseCode)
+						requestWasOk <- true
+						return transaction.EndRequest()
+					}
 
-						alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-						// TODO: it could be double confirmed but this is not implemented yet
-						embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-						if !ok {
-							log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-							addResponseCode(fieldNameCopy, model.PlcResponseCode_NOT_FOUND)
-							requestWasOk <- true
-							return transaction.EndRequest()
-						}
+					alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+					// TODO: it could be double confirmed but this is not implemented yet
+					embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+					if !ok {
+						log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+						addResponseCode(fieldNameCopy, model.PlcResponseCode_NOT_FOUND)
+						requestWasOk <- true
+						return transaction.EndRequest()
+					}
 
-						log.Trace().Msg("Handling confirmed data")
-						switch reply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply().(type) {
-						case readWriteModel.EncodedReplyCALReplyExactly:
-							calData := reply.GetCalReply().GetCalData()
+					log.Trace().Msg("Handling confirmed data")
+					switch reply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply().(type) {
+					case readWriteModel.EncodedReplyCALReplyExactly:
+						calData := reply.GetCalReply().GetCalData()
+						addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
+						switch calData := calData.(type) {
+						case readWriteModel.CALDataStatusExactly:
+							application := calData.GetApplication()
+							// TODO: verify application... this should be the same
+							_ = application
+							blockStart := calData.GetBlockStart()
+							// TODO: verify application... this should be the same
+							_ = blockStart
+							statusBytes := calData.GetStatusBytes()
 							addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
-							switch calData := calData.(type) {
-							case readWriteModel.CALDataStatusExactly:
-								application := calData.GetApplication()
-								// TODO: verify application... this should be the same
-								_ = application
-								blockStart := calData.GetBlockStart()
-								// TODO: verify application... this should be the same
-								_ = blockStart
+							plcListValues := make([]values.PlcValue, len(statusBytes)*4)
+							for i, statusByte := range statusBytes {
+								plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
+								plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
+								plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
+								plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
+							}
+							addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
+						case readWriteModel.CALDataStatusExtendedExactly:
+							coding := calData.GetCoding()
+							// TODO: verify coding... this should be the same
+							_ = coding
+							application := calData.GetApplication()
+							// TODO: verify application... this should be the same
+							_ = application
+							blockStart := calData.GetBlockStart()
+							// TODO: verify application... this should be the same
+							_ = blockStart
+							switch coding {
+							case readWriteModel.StatusCoding_BINARY_BY_THIS_SERIAL_INTERFACE:
+								fallthrough
+							case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
 								statusBytes := calData.GetStatusBytes()
 								addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
 								plcListValues := make([]values.PlcValue, len(statusBytes)*4)
@@ -189,67 +210,41 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 									plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
 								}
 								addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
-							case readWriteModel.CALDataStatusExtendedExactly:
-								coding := calData.GetCoding()
-								// TODO: verify coding... this should be the same
-								_ = coding
-								application := calData.GetApplication()
-								// TODO: verify application... this should be the same
-								_ = application
-								blockStart := calData.GetBlockStart()
-								// TODO: verify application... this should be the same
-								_ = blockStart
-								switch coding {
-								case readWriteModel.StatusCoding_BINARY_BY_THIS_SERIAL_INTERFACE:
-									fallthrough
-								case readWriteModel.StatusCoding_BINARY_BY_ELSEWHERE:
-									statusBytes := calData.GetStatusBytes()
-									addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
-									plcListValues := make([]values.PlcValue, len(statusBytes)*4)
-									for i, statusByte := range statusBytes {
-										plcListValues[i*4+0] = spiValues.NewPlcSTRING(statusByte.GetGav0().String())
-										plcListValues[i*4+1] = spiValues.NewPlcSTRING(statusByte.GetGav1().String())
-										plcListValues[i*4+2] = spiValues.NewPlcSTRING(statusByte.GetGav2().String())
-										plcListValues[i*4+3] = spiValues.NewPlcSTRING(statusByte.GetGav3().String())
-									}
-									addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
-								case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
-									fallthrough
-								case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
-									levelInformation := calData.GetLevelInformation()
-									addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
-									plcListValues := make([]values.PlcValue, len(levelInformation))
-									for i, levelInformation := range levelInformation {
-										switch levelInformation := levelInformation.(type) {
-										case readWriteModel.LevelInformationAbsentExactly:
-											plcListValues[i] = spiValues.NewPlcSTRING("is absent")
-										case readWriteModel.LevelInformationCorruptedExactly:
-											plcListValues[i] = spiValues.NewPlcSTRING("corrupted")
-										case readWriteModel.LevelInformationNormalExactly:
-											plcListValues[i] = spiValues.NewPlcUSINT(levelInformation.GetActualLevel())
-										default:
-											panic("Impossible case")
-										}
+							case readWriteModel.StatusCoding_LEVEL_BY_THIS_SERIAL_INTERFACE:
+								fallthrough
+							case readWriteModel.StatusCoding_LEVEL_BY_ELSEWHERE:
+								levelInformation := calData.GetLevelInformation()
+								addResponseCode(fieldNameCopy, model.PlcResponseCode_OK)
+								plcListValues := make([]values.PlcValue, len(levelInformation))
+								for i, levelInformation := range levelInformation {
+									switch levelInformation := levelInformation.(type) {
+									case readWriteModel.LevelInformationAbsentExactly:
+										plcListValues[i] = spiValues.NewPlcSTRING("is absent")
+									case readWriteModel.LevelInformationCorruptedExactly:
+										plcListValues[i] = spiValues.NewPlcSTRING("corrupted")
+									case readWriteModel.LevelInformationNormalExactly:
+										plcListValues[i] = spiValues.NewPlcUSINT(levelInformation.GetActualLevel())
+									default:
+										panic("Impossible case")
 									}
-									addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
 								}
+								addPlcValue(fieldNameCopy, spiValues.NewPlcList(plcListValues))
 							}
-							// TODO: how should we serialize that???
-							addPlcValue(fieldNameCopy, spiValues.NewPlcSTRING(fmt.Sprintf("%s", calData)))
-						default:
-							panic(fmt.Sprintf("All types should be mapped here. Not mapped: %T", reply))
 						}
-						requestWasOk <- true
-						return transaction.EndRequest()
-					},
-					func(err error) error {
-						log.Debug().Msgf("Error waiting for field %s", fieldNameCopy)
-						addResponseCode(fieldNameCopy, model.PlcResponseCode_REQUEST_TIMEOUT)
-						// TODO: ok or not ok?
-						requestWasOk <- true
-						return transaction.EndRequest()
-					},
-					time.Second*1); err != nil {
+						// TODO: how should we serialize that???
+						addPlcValue(fieldNameCopy, spiValues.NewPlcSTRING(fmt.Sprintf("%s", calData)))
+					default:
+						panic(fmt.Sprintf("All types should be mapped here. Not mapped: %T", reply))
+					}
+					requestWasOk <- true
+					return transaction.EndRequest()
+				}, func(err error) error {
+					log.Debug().Msgf("Error waiting for field %s", fieldNameCopy)
+					addResponseCode(fieldNameCopy, model.PlcResponseCode_REQUEST_TIMEOUT)
+					// TODO: ok or not ok?
+					requestWasOk <- true
+					return transaction.EndRequest()
+				}, time.Second*1); err != nil {
 					log.Debug().Err(err).Msgf("Error sending message for field %s", fieldNameCopy)
 					addResponseCode(fieldNameCopy, model.PlcResponseCode_INTERNAL_ERROR)
 					_ = transaction.EndRequest()
diff --git a/plc4go/internal/eip/Connection.go b/plc4go/internal/eip/Connection.go
index 362417d14..4db0d2381 100644
--- a/plc4go/internal/eip/Connection.go
+++ b/plc4go/internal/eip/Connection.go
@@ -20,6 +20,7 @@
 package eip
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -84,6 +85,8 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	log.Trace().Msg("Connecting")
 	ch := make(chan plc4go.PlcConnectionConnectResult)
 	go func() {
@@ -94,7 +97,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 
 		// For testing purposes we can skip the waiting for a complete connection
 		if !m.driverContext.awaitSetupComplete {
-			go m.setupConnection(ch)
+			go m.setupConnection(ctx, ch)
 			log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
 			// Here we write directly and don't wait till the connection is "really" connected
 			// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -103,69 +106,59 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 			return
 		}
 
-		m.setupConnection(ch)
+		m.setupConnection(ctx, ch)
 	}()
 	return ch
 }
 
 func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	result := make(chan plc4go.PlcConnectionCloseResult)
 	go func() {
 		log.Debug().Msg("Sending UnregisterSession EIP Packet")
-		_ = m.messageCodec.SendRequest(
-			readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, make([]byte, 8), 0),
-			func(message spi.Message) bool {
-				return true
-			},
-			func(message spi.Message) error {
-				return nil
-			},
-			func(err error) error {
-				return nil
-			},
-			m.GetTtl(),
-		) //Unregister gets no response
+		_ = m.messageCodec.SendRequest(ctx, readWriteModel.NewEipDisconnectRequest(m.sessionHandle, 0, make([]byte, 8), 0), func(message spi.Message) bool {
+			return true
+		}, func(message spi.Message) error {
+			return nil
+		}, func(err error) error {
+			return nil
+		}, m.GetTtl()) //Unregister gets no response
 		log.Debug().Msgf("Unregistred Session %d", m.sessionHandle)
 	}()
 	return result
 }
 
-func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
 	log.Debug().Msg("Sending EIP Connection Request")
-	if err := m.messageCodec.SendRequest(
-		readWriteModel.NewEipConnectionRequest(0, 0, make([]byte, 8), 0),
-		func(message spi.Message) bool {
-			eipPacket := message.(readWriteModel.EipPacket)
-			if eipPacket == nil {
-				return false
-			}
-			eipPacketConnectionRequest := eipPacket.(readWriteModel.EipConnectionRequest)
-			return eipPacketConnectionRequest != nil
-		},
-		func(message spi.Message) error {
-			eipPacket := message.(readWriteModel.EipPacket)
-			if eipPacket.GetStatus() == 0 {
-				m.sessionHandle = eipPacket.GetSessionHandle()
-				m.senderContext = eipPacket.GetSenderContext()
-				log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
-				// Send an event that connection setup is complete.
-				m.fireConnected(ch)
-			} else {
-
-			}
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-				m.Close()
-			}
-			m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
-			return nil
-		},
-		m.GetTtl(),
-	); err != nil {
+	if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewEipConnectionRequest(0, 0, make([]byte, 8), 0), func(message spi.Message) bool {
+		eipPacket := message.(readWriteModel.EipPacket)
+		if eipPacket == nil {
+			return false
+		}
+		eipPacketConnectionRequest := eipPacket.(readWriteModel.EipConnectionRequest)
+		return eipPacketConnectionRequest != nil
+	}, func(message spi.Message) error {
+		eipPacket := message.(readWriteModel.EipPacket)
+		if eipPacket.GetStatus() == 0 {
+			m.sessionHandle = eipPacket.GetSessionHandle()
+			m.senderContext = eipPacket.GetSenderContext()
+			log.Debug().Msgf("Got assigned with Session %d", m.sessionHandle)
+			// Send an event that connection setup is complete.
+			m.fireConnected(ch)
+		} else {
+
+		}
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+			m.Close()
+		}
+		m.fireConnectionError(errors.Wrap(err, "got error processing request"), ch)
+		return nil
+	}, m.GetTtl()); err != nil {
 		m.fireConnectionError(errors.Wrap(err, "Error during sending of EIP Connection Request"), ch)
 	}
 }
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 65caf6b68..f20058abb 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -115,60 +115,55 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 			transaction.Submit(func() {
 				// Send the  over the wire
 				log.Trace().Msg("Send ")
-				if err := m.messageCodec.SendRequest(
-					pkt,
-					func(message spi.Message) bool {
-						eipPacket := message.(readWriteModel.EipPacket)
-						if eipPacket == nil {
-							return false
-						}
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						if cipRRData == nil {
-							return false
-						}
-						if eipPacket.GetSessionHandle() != *m.sessionHandle {
-							return false
-						}
-						multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
-						if multipleServiceResponse == nil {
-							return false
-						}
-						if multipleServiceResponse.GetServiceNb() != nb {
-							return false
-						}
-						return true
-					},
-					func(message spi.Message) error {
-						// Convert the response into an
-						log.Trace().Msg("convert response to ")
-						eipPacket := message.(readWriteModel.EipPacket)
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
-						// Convert the eip response into a PLC4X response
-						log.Trace().Msg("convert response to PLC4X response")
-						readResponse, err := m.ToPlc4xReadResponse(multipleServiceResponse, readRequest)
+				if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+					eipPacket := message.(readWriteModel.EipPacket)
+					if eipPacket == nil {
+						return false
+					}
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					if cipRRData == nil {
+						return false
+					}
+					if eipPacket.GetSessionHandle() != *m.sessionHandle {
+						return false
+					}
+					multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+					if multipleServiceResponse == nil {
+						return false
+					}
+					if multipleServiceResponse.GetServiceNb() != nb {
+						return false
+					}
+					return true
+				}, func(message spi.Message) error {
+					// Convert the response into an
+					log.Trace().Msg("convert response to ")
+					eipPacket := message.(readWriteModel.EipPacket)
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+					// Convert the eip response into a PLC4X response
+					log.Trace().Msg("convert response to PLC4X response")
+					readResponse, err := m.ToPlc4xReadResponse(multipleServiceResponse, readRequest)
 
-						if err != nil {
-							result <- &plc4goModel.DefaultPlcReadRequestResult{
-								Request: readRequest,
-								Err:     errors.Wrap(err, "Error decoding response"),
-							}
-							return transaction.EndRequest()
-						}
-						result <- &plc4goModel.DefaultPlcReadRequestResult{
-							Request:  readRequest,
-							Response: readResponse,
-						}
-						return transaction.EndRequest()
-					},
-					func(err error) error {
+					if err != nil {
 						result <- &plc4goModel.DefaultPlcReadRequestResult{
 							Request: readRequest,
-							Err:     errors.Wrap(err, "got timeout while waiting for response"),
+							Err:     errors.Wrap(err, "Error decoding response"),
 						}
 						return transaction.EndRequest()
-					},
-					time.Second*1); err != nil {
+					}
+					result <- &plc4goModel.DefaultPlcReadRequestResult{
+						Request:  readRequest,
+						Response: readResponse,
+					}
+					return transaction.EndRequest()
+				}, func(err error) error {
+					result <- &plc4goModel.DefaultPlcReadRequestResult{
+						Request: readRequest,
+						Err:     errors.Wrap(err, "got timeout while waiting for response"),
+					}
+					return transaction.EndRequest()
+				}, time.Second*1); err != nil {
 					result <- &plc4goModel.DefaultPlcReadRequestResult{
 						Request:  readRequest,
 						Response: nil,
@@ -201,57 +196,52 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 			transaction.Submit(func() {
 				// Send the  over the wire
 				log.Trace().Msg("Send ")
-				if err := m.messageCodec.SendRequest(
-					pkt,
-					func(message spi.Message) bool {
-						eipPacket := message.(readWriteModel.EipPacket)
-						if eipPacket == nil {
-							return false
-						}
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						if cipRRData == nil {
-							return false
-						}
-						if eipPacket.GetSessionHandle() != *m.sessionHandle {
-							return false
-						}
-						cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
-						if cipReadResponse == nil {
-							return false
-						}
-						return true
-					},
-					func(message spi.Message) error {
-						// Convert the response into an
-						log.Trace().Msg("convert response to ")
-						eipPacket := message.(readWriteModel.EipPacket)
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
-						// Convert the eip response into a PLC4X response
-						log.Trace().Msg("convert response to PLC4X response")
-						readResponse, err := m.ToPlc4xReadResponse(cipReadResponse, readRequest)
+				if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+					eipPacket := message.(readWriteModel.EipPacket)
+					if eipPacket == nil {
+						return false
+					}
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					if cipRRData == nil {
+						return false
+					}
+					if eipPacket.GetSessionHandle() != *m.sessionHandle {
+						return false
+					}
+					cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
+					if cipReadResponse == nil {
+						return false
+					}
+					return true
+				}, func(message spi.Message) error {
+					// Convert the response into an
+					log.Trace().Msg("convert response to ")
+					eipPacket := message.(readWriteModel.EipPacket)
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					cipReadResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipReadResponse)
+					// Convert the eip response into a PLC4X response
+					log.Trace().Msg("convert response to PLC4X response")
+					readResponse, err := m.ToPlc4xReadResponse(cipReadResponse, readRequest)
 
-						if err != nil {
-							result <- &plc4goModel.DefaultPlcReadRequestResult{
-								Request: readRequest,
-								Err:     errors.Wrap(err, "Error decoding response"),
-							}
-							return transaction.EndRequest()
-						}
-						result <- &plc4goModel.DefaultPlcReadRequestResult{
-							Request:  readRequest,
-							Response: readResponse,
-						}
-						return transaction.EndRequest()
-					},
-					func(err error) error {
+					if err != nil {
 						result <- &plc4goModel.DefaultPlcReadRequestResult{
 							Request: readRequest,
-							Err:     errors.Wrap(err, "got timeout while waiting for response"),
+							Err:     errors.Wrap(err, "Error decoding response"),
 						}
 						return transaction.EndRequest()
-					},
-					time.Second*1); err != nil {
+					}
+					result <- &plc4goModel.DefaultPlcReadRequestResult{
+						Request:  readRequest,
+						Response: readResponse,
+					}
+					return transaction.EndRequest()
+				}, func(err error) error {
+					result <- &plc4goModel.DefaultPlcReadRequestResult{
+						Request: readRequest,
+						Err:     errors.Wrap(err, "got timeout while waiting for response"),
+					}
+					return transaction.EndRequest()
+				}, time.Second*1); err != nil {
 					result <- &plc4goModel.DefaultPlcReadRequestResult{
 						Request:  readRequest,
 						Response: nil,
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 7343b07af..86101cedf 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -121,57 +121,52 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 			transaction := m.tm.StartTransaction()
 			transaction.Submit(func() {
 				// Send the  over the wire
-				if err := m.messageCodec.SendRequest(
-					pkt,
-					func(message spi.Message) bool {
-						eipPacket := message.(readWriteModel.EipPacket)
-						if eipPacket == nil {
-							return false
-						}
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						if cipRRData == nil {
-							return false
-						}
-						if eipPacket.GetSessionHandle() != *m.sessionHandle {
-							return false
-						}
-						cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
-						if cipWriteResponse == nil {
-							return false
-						}
-						return true
-					},
-					func(message spi.Message) error {
-						// Convert the response into an
-						log.Trace().Msg("convert response to ")
-						eipPacket := message.(readWriteModel.EipPacket)
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
-						// Convert the eip response into a PLC4X response
-						log.Trace().Msg("convert response to PLC4X response")
-						readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
+				if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+					eipPacket := message.(readWriteModel.EipPacket)
+					if eipPacket == nil {
+						return false
+					}
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					if cipRRData == nil {
+						return false
+					}
+					if eipPacket.GetSessionHandle() != *m.sessionHandle {
+						return false
+					}
+					cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
+					if cipWriteResponse == nil {
+						return false
+					}
+					return true
+				}, func(message spi.Message) error {
+					// Convert the response into an
+					log.Trace().Msg("convert response to ")
+					eipPacket := message.(readWriteModel.EipPacket)
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					cipWriteResponse := cipRRData.GetExchange().GetService().(readWriteModel.CipWriteResponse)
+					// Convert the eip response into a PLC4X response
+					log.Trace().Msg("convert response to PLC4X response")
+					readResponse, err := m.ToPlc4xWriteResponse(cipWriteResponse, writeRequest)
 
-						if err != nil {
-							result <- &plc4goModel.DefaultPlcWriteRequestResult{
-								Request: writeRequest,
-								Err:     errors.Wrap(err, "Error decoding response"),
-							}
-							return transaction.EndRequest()
-						}
-						result <- &plc4goModel.DefaultPlcWriteRequestResult{
-							Request:  writeRequest,
-							Response: readResponse,
-						}
-						return transaction.EndRequest()
-					},
-					func(err error) error {
+					if err != nil {
 						result <- &plc4goModel.DefaultPlcWriteRequestResult{
 							Request: writeRequest,
-							Err:     errors.New("got timeout while waiting for response"),
+							Err:     errors.Wrap(err, "Error decoding response"),
 						}
 						return transaction.EndRequest()
-					},
-					time.Second*1); err != nil {
+					}
+					result <- &plc4goModel.DefaultPlcWriteRequestResult{
+						Request:  writeRequest,
+						Response: readResponse,
+					}
+					return transaction.EndRequest()
+				}, func(err error) error {
+					result <- &plc4goModel.DefaultPlcWriteRequestResult{
+						Request: writeRequest,
+						Err:     errors.New("got timeout while waiting for response"),
+					}
+					return transaction.EndRequest()
+				}, time.Second*1); err != nil {
 					result <- &plc4goModel.DefaultPlcWriteRequestResult{
 						Request:  writeRequest,
 						Response: nil,
@@ -218,60 +213,55 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 			transaction := m.tm.StartTransaction()
 			transaction.Submit(func() {
 				// Send the  over the wire
-				if err := m.messageCodec.SendRequest(
-					pkt,
-					func(message spi.Message) bool {
-						eipPacket := message.(readWriteModel.EipPacket)
-						if eipPacket == nil {
-							return false
-						}
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						if cipRRData == nil {
-							return false
-						}
-						if eipPacket.GetSessionHandle() != *m.sessionHandle {
-							return false
-						}
-						multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
-						if multipleServiceResponse == nil {
-							return false
-						}
-						if multipleServiceResponse.GetServiceNb() != nb {
-							return false
-						}
-						return true
-					},
-					func(message spi.Message) error {
-						// Convert the response into an
-						log.Trace().Msg("convert response to ")
-						eipPacket := message.(readWriteModel.EipPacket)
-						cipRRData := eipPacket.(readWriteModel.CipRRData)
-						multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
-						// Convert the eip response into a PLC4X response
-						log.Trace().Msg("convert response to PLC4X response")
-						readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
+				if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
+					eipPacket := message.(readWriteModel.EipPacket)
+					if eipPacket == nil {
+						return false
+					}
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					if cipRRData == nil {
+						return false
+					}
+					if eipPacket.GetSessionHandle() != *m.sessionHandle {
+						return false
+					}
+					multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+					if multipleServiceResponse == nil {
+						return false
+					}
+					if multipleServiceResponse.GetServiceNb() != nb {
+						return false
+					}
+					return true
+				}, func(message spi.Message) error {
+					// Convert the response into an
+					log.Trace().Msg("convert response to ")
+					eipPacket := message.(readWriteModel.EipPacket)
+					cipRRData := eipPacket.(readWriteModel.CipRRData)
+					multipleServiceResponse := cipRRData.GetExchange().GetService().(readWriteModel.MultipleServiceResponse)
+					// Convert the eip response into a PLC4X response
+					log.Trace().Msg("convert response to PLC4X response")
+					readResponse, err := m.ToPlc4xWriteResponse(multipleServiceResponse, writeRequest)
 
-						if err != nil {
-							result <- &plc4goModel.DefaultPlcWriteRequestResult{
-								Request: writeRequest,
-								Err:     errors.Wrap(err, "Error decoding response"),
-							}
-							return transaction.EndRequest()
-						}
-						result <- &plc4goModel.DefaultPlcWriteRequestResult{
-							Request:  writeRequest,
-							Response: readResponse,
-						}
-						return transaction.EndRequest()
-					},
-					func(err error) error {
+					if err != nil {
 						result <- &plc4goModel.DefaultPlcWriteRequestResult{
 							Request: writeRequest,
-							Err:     errors.New("got timeout while waiting for response"),
+							Err:     errors.Wrap(err, "Error decoding response"),
 						}
 						return transaction.EndRequest()
-					},
-					time.Second*1); err != nil {
+					}
+					result <- &plc4goModel.DefaultPlcWriteRequestResult{
+						Request:  writeRequest,
+						Response: readResponse,
+					}
+					return transaction.EndRequest()
+				}, func(err error) error {
+					result <- &plc4goModel.DefaultPlcWriteRequestResult{
+						Request: writeRequest,
+						Err:     errors.New("got timeout while waiting for response"),
+					}
+					return transaction.EndRequest()
+				}, time.Second*1); err != nil {
 					result <- &plc4goModel.DefaultPlcWriteRequestResult{
 						Request:  writeRequest,
 						Response: nil,
diff --git a/plc4go/internal/knxnetip/Browser.go b/plc4go/internal/knxnetip/Browser.go
index 74640234b..75fef4af6 100644
--- a/plc4go/internal/knxnetip/Browser.go
+++ b/plc4go/internal/knxnetip/Browser.go
@@ -56,10 +56,9 @@ func NewBrowser(connection *Connection, messageCodec spi.MessageCodec) *Browser
 }
 
 func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrowseRequest, interceptor func(result apiModel.PlcBrowseEvent) bool, fieldName string, field apiModel.PlcField) (apiModel.PlcResponseCode, []apiModel.PlcBrowseFoundField) {
-	// TODO: handle ctx
 	switch field.(type) {
 	case DeviceQueryField:
-		queryResults, err := m.executeDeviceQuery(field.(DeviceQueryField), browseRequest, fieldName, interceptor)
+		queryResults, err := m.executeDeviceQuery(ctx, field.(DeviceQueryField), browseRequest, fieldName, interceptor)
 		if err != nil {
 			log.Warn().Err(err).Msg("Error executing device query")
 			return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
@@ -67,7 +66,7 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
 			return apiModel.PlcResponseCode_OK, queryResults
 		}
 	case CommunicationObjectQueryField:
-		queryResults, err := m.executeCommunicationObjectQuery(field.(CommunicationObjectQueryField))
+		queryResults, err := m.executeCommunicationObjectQuery(ctx, field.(CommunicationObjectQueryField))
 		if err != nil {
 			log.Warn().Err(err).Msg("Error executing device query")
 			return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
@@ -79,7 +78,7 @@ func (m Browser) BrowseField(ctx context.Context, browseRequest apiModel.PlcBrow
 	}
 }
 
-func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiModel.PlcBrowseRequest, fieldName string, interceptor func(result apiModel.PlcBrowseEvent) bool) ([]apiModel.PlcBrowseFoundField, error) {
+func (m Browser) executeDeviceQuery(ctx context.Context, field DeviceQueryField, browseRequest apiModel.PlcBrowseRequest, fieldName string, interceptor func(result apiModel.PlcBrowseEvent) bool) ([]apiModel.PlcBrowseFoundField, error) {
 	// Create a list of address strings, which doesn't contain any ranges, lists or wildcards
 	knxAddresses, err := m.calculateAddresses(field)
 	if err != nil {
@@ -94,7 +93,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
 	for _, knxAddress := range knxAddresses {
 		// Send a connection request to the device
 		connectTtlTimer := time.NewTimer(m.connection.defaultTtl)
-		deviceConnections := m.connection.DeviceConnect(knxAddress)
+		deviceConnections := m.connection.DeviceConnect(ctx, knxAddress)
 		select {
 		case deviceConnection := <-deviceConnections:
 			if !connectTtlTimer.Stop() {
@@ -129,7 +128,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
 				}
 
 				disconnectTtlTimer := time.NewTimer(m.connection.defaultTtl * 10)
-				deviceDisconnections := m.connection.DeviceDisconnect(knxAddress)
+				deviceDisconnections := m.connection.DeviceDisconnect(ctx, knxAddress)
 				select {
 				case _ = <-deviceDisconnections:
 					if !disconnectTtlTimer.Stop() {
@@ -150,7 +149,7 @@ func (m Browser) executeDeviceQuery(field DeviceQueryField, browseRequest apiMod
 	return queryResults, nil
 }
 
-func (m Browser) executeCommunicationObjectQuery(field CommunicationObjectQueryField) ([]apiModel.PlcBrowseFoundField, error) {
+func (m Browser) executeCommunicationObjectQuery(ctx context.Context, field CommunicationObjectQueryField) ([]apiModel.PlcBrowseFoundField, error) {
 	var results []apiModel.PlcBrowseFoundField
 
 	knxAddress := field.toKnxAddress()
@@ -158,7 +157,7 @@ func (m Browser) executeCommunicationObjectQuery(field CommunicationObjectQueryF
 
 	// If we have a building Key, try that to login in order to access protected
 	if m.connection.buildingKey != nil {
-		arr := m.connection.DeviceAuthenticate(knxAddress, m.connection.buildingKey)
+		arr := m.connection.DeviceAuthenticate(ctx, knxAddress, m.connection.buildingKey)
 		<-arr
 	}
 
diff --git a/plc4go/internal/knxnetip/Connection.go b/plc4go/internal/knxnetip/Connection.go
index 179e873a8..98b298ee3 100644
--- a/plc4go/internal/knxnetip/Connection.go
+++ b/plc4go/internal/knxnetip/Connection.go
@@ -21,6 +21,7 @@ package knxnetip
 
 import (
 	"bytes"
+	"context"
 	"encoding/hex"
 	"fmt"
 	_default "github.com/apache/plc4x/plc4go/spi/default"
@@ -214,6 +215,8 @@ func (m *Connection) GetTracer() *spi.Tracer {
 }
 
 func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	result := make(chan plc4go.PlcConnectionConnectResult)
 	sendResult := func(connection plc4go.PlcConnection, err error) {
 		result <- _default.NewDefaultPlcConnectionConnectResult(connection, err)
@@ -228,7 +231,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 		}
 
 		// Send a search request before connecting to the device.
-		searchResponse, err := m.sendGatewaySearchRequest()
+		searchResponse, err := m.sendGatewaySearchRequest(ctx)
 		if err != nil {
 			m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error discovering device capabilities")) })
 			return
@@ -261,7 +264,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 		// Via this connection we then get access to the entire KNX network this Gateway is connected to.
 		if supportsTunneling {
 			// As soon as we got a successful search-response back, send a connection request.
-			connectionResponse, err := m.sendGatewayConnectionRequest()
+			connectionResponse, err := m.sendGatewayConnectionRequest(ctx)
 			if err != nil {
 				m.doSomethingAndClose(func() { sendResult(nil, errors.Wrap(err, "error connecting to device")) })
 				return
@@ -334,7 +337,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 								}
 							}
 						} else {
-							m.handleIncomingTunnelingRequest(tunnelingRequest)
+							m.handleIncomingTunnelingRequest(ctx, tunnelingRequest)
 						}
 					}
 					log.Warn().Msg("Tunneling handler shat down")
@@ -379,6 +382,8 @@ func (m *Connection) BlockingClose() {
 }
 
 func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	result := make(chan plc4go.PlcConnectionCloseResult)
 
 	go func() {
@@ -390,7 +395,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 		// Disconnect from all knx devices we are still connected to.
 		for targetAddress := range m.DeviceConnections {
 			ttlTimer := time.NewTimer(m.defaultTtl)
-			disconnects := m.DeviceDisconnect(targetAddress)
+			disconnects := m.DeviceDisconnect(ctx, targetAddress)
 			select {
 			case _ = <-disconnects:
 				if !ttlTimer.Stop() {
@@ -404,7 +409,7 @@ func (m *Connection) Close() <-chan plc4go.PlcConnectionCloseResult {
 		}
 
 		// Send a disconnect request from the gateway.
-		_, err := m.sendGatewayDisconnectionRequest()
+		_, err := m.sendGatewayDisconnectionRequest(ctx)
 		if err != nil {
 			result <- _default.NewDefaultPlcConnectionCloseResult(m, errors.Wrap(err, "got an error while disconnecting"))
 		} else {
@@ -435,11 +440,13 @@ func (m *Connection) IsConnected() bool {
 }
 
 func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	result := make(chan plc4go.PlcConnectionPingResult)
 
 	go func() {
 		// Send the connection state request
-		_, err := m.sendConnectionStateRequest()
+		_, err := m.sendConnectionStateRequest(ctx)
 		if err != nil {
 			result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got an error"))
 		} else {
diff --git a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
index 7694b1a2b..56b563559 100644
--- a/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionDriverSpecificOperations.go
@@ -20,6 +20,7 @@
 package knxnetip
 
 import (
+	"context"
 	"math"
 	"strconv"
 	"time"
@@ -43,7 +44,7 @@ import (
 // They expect the called private functions to handle timeouts, so these will not.
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
 
-func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
+func (m *Connection) ReadGroupAddress(ctx context.Context, groupAddress []byte, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -63,7 +64,7 @@ func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driver
 	}
 
 	go func() {
-		groupAddressReadResponse, err := m.sendGroupAddressReadRequest(groupAddress)
+		groupAddressReadResponse, err := m.sendGroupAddressReadRequest(ctx, groupAddress)
 		if err != nil {
 			sendResponse(nil, 0, errors.Wrap(err, "error reading group address"))
 			return
@@ -99,7 +100,7 @@ func (m *Connection) ReadGroupAddress(groupAddress []byte, datapointType *driver
 	return result
 }
 
-func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan KnxDeviceConnectResult {
+func (m *Connection) DeviceConnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceConnectResult {
 	result := make(chan KnxDeviceConnectResult)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
@@ -125,7 +126,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
 		}
 
 		// First send a connection request
-		controlConnectResponse, err := m.sendDeviceConnectionRequest(targetAddress)
+		controlConnectResponse, err := m.sendDeviceConnectionRequest(ctx, targetAddress)
 		if err != nil {
 			sendResponse(nil, errors.Wrap(err, "error creating device connection"))
 			return
@@ -145,7 +146,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
 		m.DeviceConnections[targetAddress] = connection
 
 		// If the connection request was successful, try to read the device-descriptor
-		deviceDescriptorResponse, err := m.sendDeviceDeviceDescriptorReadRequest(targetAddress)
+		deviceDescriptorResponse, err := m.sendDeviceDeviceDescriptorReadRequest(ctx, targetAddress)
 		if err != nil {
 			sendResponse(nil, errors.New(
 				"error reading device descriptor: "+err.Error()))
@@ -161,7 +162,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
 		// default APDU Size of 15
 		// Defined in: 03_05_01 Resources v01.09.03 AS Page 40
 		deviceApduSize := uint16(15)
-		propertyValueResponse, err := m.sendDevicePropertyReadRequest(targetAddress, 0, 56, 1, 1)
+		propertyValueResponse, err := m.sendDevicePropertyReadRequest(ctx, targetAddress, 0, 56, 1, 1)
 		if err == nil {
 			// If the count is 0, then this property doesn't exist or the user has no permission to read it.
 			// In all other cases we expect the response to contain the value.
@@ -190,7 +191,7 @@ func (m *Connection) DeviceConnect(targetAddress driverModel.KnxAddress) <-chan
 	return result
 }
 
-func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-chan KnxDeviceDisconnectResult {
+func (m *Connection) DeviceDisconnect(ctx context.Context, targetAddress driverModel.KnxAddress) <-chan KnxDeviceDisconnectResult {
 	result := make(chan KnxDeviceDisconnectResult)
 
 	sendResponse := func(connection *KnxDeviceConnection, err error) {
@@ -210,7 +211,7 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
 
 	go func() {
 		if connection, ok := m.DeviceConnections[targetAddress]; ok {
-			_, err := m.sendDeviceDisconnectionRequest(targetAddress)
+			_, err := m.sendDeviceDisconnectionRequest(ctx, targetAddress)
 
 			// Remove the connection from the list.
 			delete(m.DeviceConnections, targetAddress)
@@ -224,7 +225,7 @@ func (m *Connection) DeviceDisconnect(targetAddress driverModel.KnxAddress) <-ch
 	return result
 }
 
-func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, buildingKey []byte) <-chan KnxDeviceAuthenticateResult {
+func (m *Connection) DeviceAuthenticate(ctx context.Context, targetAddress driverModel.KnxAddress, buildingKey []byte) <-chan KnxDeviceAuthenticateResult {
 	result := make(chan KnxDeviceAuthenticateResult)
 
 	sendResponse := func(err error) {
@@ -246,7 +247,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
 		// if not, create a new one.
 		connection, ok := m.DeviceConnections[targetAddress]
 		if !ok {
-			connections := m.DeviceConnect(targetAddress)
+			connections := m.DeviceConnect(ctx, targetAddress)
 			deviceConnectionResult := <-connections
 			// If we didn't get a connect, abort
 			if deviceConnectionResult.err != nil {
@@ -260,7 +261,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
 			return
 		}
 		authenticationLevel := uint8(0)
-		authenticationResponse, err := m.sendDeviceAuthentication(targetAddress, authenticationLevel, buildingKey)
+		authenticationResponse, err := m.sendDeviceAuthentication(ctx, targetAddress, authenticationLevel, buildingKey)
 		if err == nil {
 			if authenticationResponse.GetLevel() == authenticationLevel {
 				sendResponse(nil)
@@ -277,7 +278,7 @@ func (m *Connection) DeviceAuthenticate(targetAddress driverModel.KnxAddress, bu
 	return result
 }
 
-func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) <-chan KnxReadResult {
+func (m *Connection) DeviceReadProperty(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) <-chan KnxReadResult {
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -301,7 +302,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
 		// if not, create a new one.
 		connection, ok := m.DeviceConnections[targetAddress]
 		if !ok {
-			connections := m.DeviceConnect(targetAddress)
+			connections := m.DeviceConnect(ctx, targetAddress)
 			deviceConnectionResult := <-connections
 			// If we didn't get a connect, abort
 			if deviceConnectionResult.err != nil {
@@ -317,7 +318,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
 			sendResponse(nil, 0, errors.New("unable to connect to device"))
 			return
 		}
-		propertyValueResponse, err := m.sendDevicePropertyReadRequest(targetAddress, objectId, propertyId, propertyIndex, numElements)
+		propertyValueResponse, err := m.sendDevicePropertyReadRequest(ctx, targetAddress, objectId, propertyId, propertyIndex, numElements)
 		if err != nil {
 			sendResponse(nil, 0, err)
 			return
@@ -357,7 +358,7 @@ func (m *Connection) DeviceReadProperty(targetAddress driverModel.KnxAddress, ob
 	return result
 }
 
-func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) <-chan KnxReadResult {
+func (m *Connection) DeviceReadPropertyDescriptor(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) <-chan KnxReadResult {
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -381,7 +382,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
 		// if not, create a new one.
 		connection, ok := m.DeviceConnections[targetAddress]
 		if !ok {
-			connections := m.DeviceConnect(targetAddress)
+			connections := m.DeviceConnect(ctx, targetAddress)
 			deviceConnectionResult := <-connections
 			// If we didn't get a connect, abort
 			if deviceConnectionResult.err != nil {
@@ -398,7 +399,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
 			return
 		}
 		// If we successfully got a connection, read the property
-		propertyDescriptionResponse, err := m.sendDevicePropertyDescriptionReadRequest(targetAddress, objectId, propertyId)
+		propertyDescriptionResponse, err := m.sendDevicePropertyDescriptionReadRequest(ctx, targetAddress, objectId, propertyId)
 		if err != nil {
 			sendResponse(nil, 0, err)
 			return
@@ -417,7 +418,7 @@ func (m *Connection) DeviceReadPropertyDescriptor(targetAddress driverModel.KnxA
 	return result
 }
 
-func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
+func (m *Connection) DeviceReadMemory(ctx context.Context, targetAddress driverModel.KnxAddress, address uint16, numElements uint8, datapointType *driverModel.KnxDatapointType) <-chan KnxReadResult {
 	result := make(chan KnxReadResult)
 
 	sendResponse := func(value *values.PlcValue, numItems uint8, err error) {
@@ -447,7 +448,7 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
 		// if not, create a new one.
 		connection, ok := m.DeviceConnections[targetAddress]
 		if !ok {
-			connections := m.DeviceConnect(targetAddress)
+			connections := m.DeviceConnect(ctx, targetAddress)
 			deviceConnectionResult := <-connections
 			// If we didn't get a connect, abort
 			if deviceConnectionResult.err != nil {
@@ -477,7 +478,7 @@ func (m *Connection) DeviceReadMemory(targetAddress driverModel.KnxAddress, addr
 			maxNumElementsPerRequest := uint8(math.Floor(float64(maxNumBytes / elementSize)))
 			numElements := uint8(math.Min(float64(remainingRequestElements), float64(maxNumElementsPerRequest)))
 			numBytes := numElements * uint8(math.Max(float64(1), float64(datapointType.DatapointMainType().SizeInBits()/8)))
-			memoryReadResponse, err := m.sendDeviceMemoryReadRequest(targetAddress, curStartingAddress, numBytes)
+			memoryReadResponse, err := m.sendDeviceMemoryReadRequest(ctx, targetAddress, curStartingAddress, numBytes)
 			if err != nil {
 				// TODO: do we need to send a response here
 				return
diff --git a/plc4go/internal/knxnetip/ConnectionHelper.go b/plc4go/internal/knxnetip/ConnectionHelper.go
index 1794c6bd3..11cc6c42b 100644
--- a/plc4go/internal/knxnetip/ConnectionHelper.go
+++ b/plc4go/internal/knxnetip/ConnectionHelper.go
@@ -20,6 +20,7 @@
 package knxnetip
 
 import (
+	"context"
 	"fmt"
 	"math"
 	"net"
@@ -50,7 +51,7 @@ func (m *Connection) castIpToKnxAddress(ip net.IP) driverModel.IPAddress {
 	return driverModel.NewIPAddress(ip[len(ip)-4:])
 }
 
-func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel.TunnelingRequest) {
+func (m *Connection) handleIncomingTunnelingRequest(ctx context.Context, tunnelingRequest driverModel.TunnelingRequest) {
 	go func() {
 		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
 		if lDataInd == nil {
@@ -83,7 +84,7 @@ func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel
 					targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
 					if targetAddress == m.ClientKnxAddress {
 						log.Info().Msg("Acknowleding an unhandled data message.")
-						_ = m.sendDeviceAck(dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
+						_ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
 					}
 				}
 			case driverModel.ApduControlContainer:
@@ -94,7 +95,7 @@ func (m *Connection) handleIncomingTunnelingRequest(tunnelingRequest driverModel
 				targetAddress := ByteArrayToKnxAddress(dataFrame.GetDestinationAddress())
 				if targetAddress == m.ClientKnxAddress {
 					log.Info().Msg("Acknowleding an unhandled contol message.")
-					_ = m.sendDeviceAck(dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
+					_ = m.sendDeviceAck(ctx, dataFrame.GetSourceAddress(), dataFrame.GetApdu().GetCounter(), func(err error) {})
 				}
 			}
 		default:
diff --git a/plc4go/internal/knxnetip/ConnectionInternalOperations.go b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
index 546e52ba9..c9a1e129b 100644
--- a/plc4go/internal/knxnetip/ConnectionInternalOperations.go
+++ b/plc4go/internal/knxnetip/ConnectionInternalOperations.go
@@ -20,6 +20,7 @@
 package knxnetip
 
 import (
+	"context"
 	"github.com/apache/plc4x/plc4go/spi"
 	"reflect"
 	"time"
@@ -42,7 +43,7 @@ import (
 // They all assume the connection is checked and is available.
 ///////////////////////////////////////////////////////////////////////////////////////////////////////
 
-func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, error) {
+func (m *Connection) sendGatewaySearchRequest(ctx context.Context) (driverModel.SearchResponse, error) {
 	localAddress, err := m.getLocalAddress()
 	if err != nil {
 		return nil, errors.Wrap(err, "error getting local address")
@@ -56,25 +57,21 @@ func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, err
 
 	result := make(chan driverModel.SearchResponse)
 	errorResult := make(chan error)
-	err = m.messageCodec.SendRequest(searchRequest,
-		func(message spi.Message) bool {
-			searchResponse := driverModel.CastSearchResponse(message)
-			return searchResponse != nil
-		},
-		func(message spi.Message) error {
-			searchResponse := driverModel.CastSearchResponse(message)
-			result <- searchResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+	err = m.messageCodec.SendRequest(ctx, searchRequest, func(message spi.Message) bool {
+		searchResponse := driverModel.CastSearchResponse(message)
+		return searchResponse != nil
+	}, func(message spi.Message) error {
+		searchResponse := driverModel.CastSearchResponse(message)
+		result <- searchResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending search request")
@@ -99,7 +96,7 @@ func (m *Connection) sendGatewaySearchRequest() (driverModel.SearchResponse, err
 	}
 }
 
-func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionResponse, error) {
+func (m *Connection) sendGatewayConnectionRequest(ctx context.Context) (driverModel.ConnectionResponse, error) {
 	localAddress, err := m.getLocalAddress()
 	if err != nil {
 		return nil, errors.Wrap(err, "error getting local address")
@@ -114,25 +111,21 @@ func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionRespo
 
 	result := make(chan driverModel.ConnectionResponse)
 	errorResult := make(chan error)
-	err = m.messageCodec.SendRequest(connectionRequest,
-		func(message spi.Message) bool {
-			connectionResponse := driverModel.CastConnectionResponse(message)
-			return connectionResponse != nil
-		},
-		func(message spi.Message) error {
-			connectionResponse := driverModel.CastConnectionResponse(message)
-			result <- connectionResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+	err = m.messageCodec.SendRequest(ctx, connectionRequest, func(message spi.Message) bool {
+		connectionResponse := driverModel.CastConnectionResponse(message)
+		return connectionResponse != nil
+	}, func(message spi.Message) error {
+		connectionResponse := driverModel.CastConnectionResponse(message)
+		result <- connectionResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -146,7 +139,7 @@ func (m *Connection) sendGatewayConnectionRequest() (driverModel.ConnectionRespo
 	}
 }
 
-func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectResponse, error) {
+func (m *Connection) sendGatewayDisconnectionRequest(ctx context.Context) (driverModel.DisconnectResponse, error) {
 	localAddress, err := m.getLocalAddress()
 	if err != nil {
 		return nil, errors.Wrap(err, "error getting local address")
@@ -164,25 +157,21 @@ func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectRe
 
 	result := make(chan driverModel.DisconnectResponse)
 	errorResult := make(chan error)
-	err = m.messageCodec.SendRequest(disconnectRequest,
-		func(message spi.Message) bool {
-			disconnectResponse := driverModel.CastDisconnectResponse(message)
-			return disconnectResponse != nil
-		},
-		func(message spi.Message) error {
-			disconnectResponse := driverModel.CastDisconnectResponse(message)
-			result <- disconnectResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+	err = m.messageCodec.SendRequest(ctx, disconnectRequest, func(message spi.Message) bool {
+		disconnectResponse := driverModel.CastDisconnectResponse(message)
+		return disconnectResponse != nil
+	}, func(message spi.Message) error {
+		disconnectResponse := driverModel.CastDisconnectResponse(message)
+		result <- disconnectResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -196,7 +185,7 @@ func (m *Connection) sendGatewayDisconnectionRequest() (driverModel.DisconnectRe
 	}
 }
 
-func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateResponse, error) {
+func (m *Connection) sendConnectionStateRequest(ctx context.Context) (driverModel.ConnectionStateResponse, error) {
 	localAddress, err := m.getLocalAddress()
 	if err != nil {
 		return nil, errors.Wrap(err, "error getting local address")
@@ -211,25 +200,21 @@ func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateRe
 
 	result := make(chan driverModel.ConnectionStateResponse)
 	errorResult := make(chan error)
-	err = m.messageCodec.SendRequest(connectionStateRequest,
-		func(message spi.Message) bool {
-			connectionStateResponse := driverModel.CastConnectionStateResponse(message)
-			return connectionStateResponse != nil
-		},
-		func(message spi.Message) error {
-			connectionStateResponse := driverModel.CastConnectionStateResponse(message)
-			result <- connectionStateResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+	err = m.messageCodec.SendRequest(ctx, connectionStateRequest, func(message spi.Message) bool {
+		connectionStateResponse := driverModel.CastConnectionStateResponse(message)
+		return connectionStateResponse != nil
+	}, func(message spi.Message) error {
+		connectionStateResponse := driverModel.CastConnectionStateResponse(message)
+		result <- connectionStateResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -243,7 +228,7 @@ func (m *Connection) sendConnectionStateRequest() (driverModel.ConnectionStateRe
 	}
 }
 
-func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverModel.ApduDataGroupValueResponse, error) {
+func (m *Connection) sendGroupAddressReadRequest(ctx context.Context, groupAddress []byte) (driverModel.ApduDataGroupValueResponse, error) {
 	// Send the property read request and wait for a confirmation that this property is readable.
 	groupAddressReadRequest := driverModel.NewTunnelingRequest(
 		driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -269,52 +254,47 @@ func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverMod
 
 	result := make(chan driverModel.ApduDataGroupValueResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		groupAddressReadRequest,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if dataContainer == nil {
-				return false
-			}
-			groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
-			if groupReadResponse == nil {
-				return false
-			}
-			// Check if it's a value response for the given group address
-			return dataFrameExt.GetGroupAddress() && reflect.DeepEqual(dataFrameExt.GetSourceAddress(), groupAddress)
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
-
-			result <- groupReadResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+	err := m.messageCodec.SendRequest(ctx, groupAddressReadRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if dataContainer == nil {
+			return false
+		}
+		groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
+		if groupReadResponse == nil {
+			return false
+		}
+		// Check if it's a value response for the given group address
+		return dataFrameExt.GetGroupAddress() && reflect.DeepEqual(dataFrameExt.GetSourceAddress(), groupAddress)
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		groupReadResponse := driverModel.CastApduDataGroupValueResponse(dataContainer.GetDataApdu())
+
+		result <- groupReadResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -328,7 +308,7 @@ func (m *Connection) sendGroupAddressReadRequest(groupAddress []byte) (driverMod
 	}
 }
 
-func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduControlConnect, error) {
+func (m *Connection) sendDeviceConnectionRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduControlConnect, error) {
 	// Send a connection request to the individual KNX device
 	deviceConnectionRequest := driverModel.NewTunnelingRequest(
 		driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -354,59 +334,53 @@ func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAd
 
 	result := make(chan driverModel.ApduControlConnect)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		deviceConnectionRequest,
-		// The Gateway is now supposed to send an Ack to this request.
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
-			if lDataCon == nil {
-				return false
-			}
-			lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
-			if lDataFrameExt == nil {
-				return false
-			}
-			// Check if the address matches
-			if ByteArrayToKnxAddress(lDataFrameExt.GetDestinationAddress()) != targetAddress {
-				return false
-			}
-			apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
-			if apduControlContainer == nil {
-				return false
-			}
-			apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
-			return apduControlConnect != nil
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
-			lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
-			apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
-			apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
-
-			// If the error flag is set, there was an error connecting
-			if lDataCon.GetDataFrame().GetErrorFlag() {
-				errorResult <- errors.Errorf("error connecting to device at: %s", KnxAddressToString(targetAddress))
-			} else {
-				result <- apduControlConnect
-			}
+	err := m.messageCodec.SendRequest(ctx, deviceConnectionRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+		if lDataCon == nil {
+			return false
+		}
+		lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+		if lDataFrameExt == nil {
+			return false
+		}
+		// Check if the address matches
+		if ByteArrayToKnxAddress(lDataFrameExt.GetDestinationAddress()) != targetAddress {
+			return false
+		}
+		apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
+		if apduControlContainer == nil {
+			return false
+		}
+		apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
+		return apduControlConnect != nil
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+		lDataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+		apduControlContainer := driverModel.CastApduControlContainer(lDataFrameExt.GetApdu())
+		apduControlConnect := driverModel.CastApduControlConnect(apduControlContainer.GetControlApdu())
+
+		// If the error flag is set, there was an error connecting
+		if lDataCon.GetDataFrame().GetErrorFlag() {
+			errorResult <- errors.Errorf("error connecting to device at: %s", KnxAddressToString(targetAddress))
+		} else {
+			result <- apduControlConnect
+		}
 
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -420,7 +394,7 @@ func (m *Connection) sendDeviceConnectionRequest(targetAddress driverModel.KnxAd
 	}
 }
 
-func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduControlDisconnect, error) {
+func (m *Connection) sendDeviceDisconnectionRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduControlDisconnect, error) {
 	// Send a connection request to the individual KNX device
 	deviceDisconnectionRequest := driverModel.NewTunnelingRequest(
 		driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
@@ -446,60 +420,54 @@ func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.Kn
 
 	result := make(chan driverModel.ApduControlDisconnect)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		deviceDisconnectionRequest,
-		// The Gateway is now supposed to send an Ack to this request.
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
-			if lDataCon == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
-			// Check if the address matches
-			if curTargetAddress != targetAddress {
-				return false
-			}
-			apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
-			if apduControlContainer == nil {
-				return false
-			}
-			apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
-			return apduControlDisconnect != nil
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
-			apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
-			apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
-
-			// If the error flag is set, there was an error disconnecting
-			if lDataCon.GetDataFrame().GetErrorFlag() {
-				errorResult <- errors.Errorf("error disconnecting from device at: %s", KnxAddressToString(targetAddress))
-			} else {
-				result <- apduControlDisconnect
-			}
+	err := m.messageCodec.SendRequest(ctx, deviceDisconnectionRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+		if lDataCon == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+		// Check if the address matches
+		if curTargetAddress != targetAddress {
+			return false
+		}
+		apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+		if apduControlContainer == nil {
+			return false
+		}
+		apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
+		return apduControlDisconnect != nil
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+		apduControlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+		apduControlDisconnect := driverModel.CastApduControlDisconnect(apduControlContainer.GetControlApdu())
+
+		// If the error flag is set, there was an error disconnecting
+		if lDataCon.GetDataFrame().GetErrorFlag() {
+			errorResult <- errors.Errorf("error disconnecting from device at: %s", KnxAddressToString(targetAddress))
+		} else {
+			result <- apduControlDisconnect
+		}
 
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -513,7 +481,7 @@ func (m *Connection) sendDeviceDisconnectionRequest(targetAddress driverModel.Kn
 	}
 }
 
-func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddress, authenticationLevel uint8, buildingKey []byte) (driverModel.ApduDataExtAuthorizeResponse, error) {
+func (m *Connection) sendDeviceAuthentication(ctx context.Context, targetAddress driverModel.KnxAddress, authenticationLevel uint8, buildingKey []byte) (driverModel.ApduDataExtAuthorizeResponse, error) {
 	// Check if there is already a connection available,
 	// if not, create a new one.
 	connection, ok := m.DeviceConnections[targetAddress]
@@ -556,80 +524,74 @@ func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddre
 
 	result := make(chan driverModel.ApduDataExtAuthorizeResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		deviceAuthenticationRequest,
-		// The Gateway is now supposed to send an Ack to this request.
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if apduDataContainer == nil {
-				return false
-			}
-			apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
-			if apduDataOther == nil {
-				return false
-			}
-			apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
-			if apduAuthorizeResponse == nil {
-				return false
-			}
-			curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
-			// Check if the addresses match
-			if curTargetAddress != m.ClientKnxAddress {
-				return false
-			}
-			if dataFrameExt.GetSourceAddress() != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			return true
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
-			apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
-
-			// Acknowledge the receipt
-			_ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
-				// If the error flag is set, there was an error authenticating
-				if lDataInd.GetDataFrame().GetErrorFlag() {
-					errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(targetAddress))
-				} else if err != nil {
-					errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
-				} else {
-					result <- apduAuthorizeResponse
-				}
-			})
-
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
+	err := m.messageCodec.SendRequest(ctx, deviceAuthenticationRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if apduDataContainer == nil {
+			return false
+		}
+		apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
+		if apduDataOther == nil {
+			return false
+		}
+		apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
+		if apduAuthorizeResponse == nil {
+			return false
+		}
+		curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+		// Check if the addresses match
+		if curTargetAddress != m.ClientKnxAddress {
+			return false
+		}
+		if dataFrameExt.GetSourceAddress() != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		return true
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		apduDataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		apduDataOther := driverModel.CastApduDataOther(apduDataContainer.GetDataApdu())
+		apduAuthorizeResponse := driverModel.CastApduDataExtAuthorizeResponse(apduDataOther.GetExtendedApdu())
+
+		// Acknowledge the receipt
+		_ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+			// If the error flag is set, there was an error authenticating
+			if lDataInd.GetDataFrame().GetErrorFlag() {
+				errorResult <- errors.New("error authenticating at device: " + KnxAddressToString(targetAddress))
+			} else if err != nil {
+				errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+			} else {
+				result <- apduAuthorizeResponse
 			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		})
+
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending request")
@@ -643,7 +605,7 @@ func (m *Connection) sendDeviceAuthentication(targetAddress driverModel.KnxAddre
 	}
 }
 
-func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverModel.KnxAddress) (driverModel.ApduDataDeviceDescriptorResponse, error) {
+func (m *Connection) sendDeviceDeviceDescriptorReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress) (driverModel.ApduDataDeviceDescriptorResponse, error) {
 	// Next, read the device descriptor so we know how we have to communicate with the device.
 	counter := m.getNextCounter(targetAddress)
 	deviceDescriptorReadRequest := driverModel.NewTunnelingRequest(
@@ -673,70 +635,65 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverM
 
 	result := make(chan driverModel.ApduDataDeviceDescriptorResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		deviceDescriptorReadRequest,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			// Check if the address matches
-			if dataFrameExt.GetSourceAddress() != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if dataContainer == nil {
-				return false
-			}
-			deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
-			if deviceDescriptorResponse == nil {
-				return false
-			}
-			return true
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrame := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			dataContainer := driverModel.CastApduDataContainer(dataFrame.GetApdu())
-			deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
-
-			// Acknowledge the receipt
-			_ = m.sendDeviceAck(targetAddress, dataFrame.GetApdu().GetCounter(), func(err error) {
-				// If the error flag is set, there was an error authenticating
-				if lDataInd.GetDataFrame().GetErrorFlag() {
-					errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(targetAddress))
-				} else if err != nil {
-					errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
-				} else {
-					result <- deviceDescriptorResponse
-				}
-			})
-
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
+	err := m.messageCodec.SendRequest(ctx, deviceDescriptorReadRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		// Check if the address matches
+		if dataFrameExt.GetSourceAddress() != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if dataContainer == nil {
+			return false
+		}
+		deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
+		if deviceDescriptorResponse == nil {
+			return false
+		}
+		return true
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrame := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		dataContainer := driverModel.CastApduDataContainer(dataFrame.GetApdu())
+		deviceDescriptorResponse := driverModel.CastApduDataDeviceDescriptorResponse(dataContainer.GetDataApdu())
+
+		// Acknowledge the receipt
+		_ = m.sendDeviceAck(ctx, targetAddress, dataFrame.GetApdu().GetCounter(), func(err error) {
+			// If the error flag is set, there was an error authenticating
+			if lDataInd.GetDataFrame().GetErrorFlag() {
+				errorResult <- errors.New("error reading device descriptor from device: " + KnxAddressToString(targetAddress))
+			} else if err != nil {
+				errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+			} else {
+				result <- deviceDescriptorResponse
 			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		})
+
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending device descriptor read request")
@@ -750,7 +707,7 @@ func (m *Connection) sendDeviceDeviceDescriptorReadRequest(targetAddress driverM
 	}
 }
 
-func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) (driverModel.ApduDataExtPropertyValueResponse, error) {
+func (m *Connection) sendDevicePropertyReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8, propertyIndex uint16, numElements uint8) (driverModel.ApduDataExtPropertyValueResponse, error) {
 	// Next, read the device descriptor so we know how we have to communicate with the device.
 	// Send the property read request and wait for a confirmation that this property is readable.
 	counter := m.getNextCounter(targetAddress)
@@ -787,75 +744,70 @@ func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.Knx
 
 	result := make(chan driverModel.ApduDataExtPropertyValueResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		propertyReadRequest,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			// Check if the address matches
-			if dataFrameExt.GetSourceAddress() != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if dataContainer == nil {
-				return false
-			}
-			dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
-			if dataApduOther == nil {
-				return false
-			}
-			propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
-			if propertyValueResponse == nil {
-				return false
-			}
-			return propertyValueResponse.GetObjectIndex() == objectId && propertyValueResponse.GetPropertyId() == propertyId
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
-			propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
-
-			// Acknowledge the receipt
-			_ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
-				// If the error flag is set, there was an error authenticating
-				if lDataInd.GetDataFrame().GetErrorFlag() {
-					errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(targetAddress))
-				} else if err != nil {
-					errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
-				} else {
-					result <- propertyValueResponse
-				}
-			})
-
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
+	err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		// Check if the address matches
+		if dataFrameExt.GetSourceAddress() != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if dataContainer == nil {
+			return false
+		}
+		dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+		if dataApduOther == nil {
+			return false
+		}
+		propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
+		if propertyValueResponse == nil {
+			return false
+		}
+		return propertyValueResponse.GetObjectIndex() == objectId && propertyValueResponse.GetPropertyId() == propertyId
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+		propertyValueResponse := driverModel.CastApduDataExtPropertyValueResponse(dataApduOther.GetExtendedApdu())
+
+		// Acknowledge the receipt
+		_ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+			// If the error flag is set, there was an error authenticating
+			if lDataInd.GetDataFrame().GetErrorFlag() {
+				errorResult <- errors.New("error reading property value from device: " + KnxAddressToString(targetAddress))
+			} else if err != nil {
+				errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+			} else {
+				result <- propertyValueResponse
 			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		})
+
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending device property read request")
@@ -869,7 +821,7 @@ func (m *Connection) sendDevicePropertyReadRequest(targetAddress driverModel.Knx
 	}
 }
 
-func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) (driverModel.ApduDataExtPropertyDescriptionResponse, error) {
+func (m *Connection) sendDevicePropertyDescriptionReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, objectId uint8, propertyId uint8) (driverModel.ApduDataExtPropertyDescriptionResponse, error) {
 	// Next, read the device descriptor so we know how we have to communicate with the device.
 	// Send the property read request and wait for a confirmation that this property is readable.
 	counter := m.getNextCounter(targetAddress)
@@ -906,75 +858,70 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driv
 
 	result := make(chan driverModel.ApduDataExtPropertyDescriptionResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		propertyReadRequest,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			// Check if the address matches
-			if dataFrameExt.GetSourceAddress() != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if dataContainer == nil {
-				return false
-			}
-			dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
-			if dataApduOther == nil {
-				return false
-			}
-			propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
-			if propertyDescriptionResponse == nil {
-				return false
-			}
-			return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
-			propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
-
-			// Acknowledge the receipt
-			_ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
-				// If the error flag is set, there was an error authenticating
-				if lDataInd.GetDataFrame().GetErrorFlag() {
-					errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress))
-				} else if err != nil {
-					errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
-				} else {
-					result <- propertyDescriptionResponse
-				}
-			})
-
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
+	err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		// Check if the address matches
+		if dataFrameExt.GetSourceAddress() != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if dataContainer == nil {
+			return false
+		}
+		dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+		if dataApduOther == nil {
+			return false
+		}
+		propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
+		if propertyDescriptionResponse == nil {
+			return false
+		}
+		return propertyDescriptionResponse.GetObjectIndex() == objectId && propertyDescriptionResponse.GetPropertyId() == propertyId
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		dataApduOther := driverModel.CastApduDataOther(dataContainer.GetDataApdu())
+		propertyDescriptionResponse := driverModel.CastApduDataExtPropertyDescriptionResponse(dataApduOther.GetExtendedApdu())
+
+		// Acknowledge the receipt
+		_ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+			// If the error flag is set, there was an error authenticating
+			if lDataInd.GetDataFrame().GetErrorFlag() {
+				errorResult <- errors.Errorf("error reading property description from device: %s", KnxAddressToString(targetAddress))
+			} else if err != nil {
+				errorResult <- errors.Wrapf(err, "error sending ack to device: %s", KnxAddressToString(targetAddress))
+			} else {
+				result <- propertyDescriptionResponse
 			}
-			errorResult <- errors.Wrapf(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		})
+
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrapf(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending property description read request")
@@ -988,7 +935,7 @@ func (m *Connection) sendDevicePropertyDescriptionReadRequest(targetAddress driv
 	}
 }
 
-func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAddress, address uint16, numBytes uint8) (driverModel.ApduDataMemoryResponse, error) {
+func (m *Connection) sendDeviceMemoryReadRequest(ctx context.Context, targetAddress driverModel.KnxAddress, address uint16, numBytes uint8) (driverModel.ApduDataMemoryResponse, error) {
 	// Next, read the device descriptor so we know how we have to communicate with the device.
 	counter := m.getNextCounter(targetAddress)
 
@@ -1023,71 +970,66 @@ func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAd
 
 	result := make(chan driverModel.ApduDataMemoryResponse)
 	errorResult := make(chan error)
-	err := m.messageCodec.SendRequest(
-		propertyReadRequest,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			if lDataInd == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			if dataContainer == nil {
-				return false
-			}
-			dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
-			if dataApduMemoryResponse == nil {
-				return false
-			}
+	err := m.messageCodec.SendRequest(ctx, propertyReadRequest, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		if lDataInd == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		if dataContainer == nil {
+			return false
+		}
+		dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
+		if dataApduMemoryResponse == nil {
+			return false
+		}
 
-			// Check if the address matches
-			if dataFrameExt.GetSourceAddress() != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			return dataApduMemoryResponse.GetAddress() == address
-		},
-		func(message spi.Message) error {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
-			dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
-			dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
-			dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
-
-			// Acknowledge the receipt
-			_ = m.sendDeviceAck(targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
-				// If the error flag is set, there was an error authenticating
-				if lDataInd.GetDataFrame().GetErrorFlag() {
-					errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress))
-				} else if err != nil {
-					errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress))
-				} else {
-					result <- dataApduMemoryResponse
-				}
-			})
-
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
+		// Check if the address matches
+		if dataFrameExt.GetSourceAddress() != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		return dataApduMemoryResponse.GetAddress() == address
+	}, func(message spi.Message) error {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		lDataInd := driverModel.CastLDataInd(tunnelingRequest.GetCemi())
+		dataFrameExt := driverModel.CastLDataExtended(lDataInd.GetDataFrame())
+		dataContainer := driverModel.CastApduDataContainer(dataFrameExt.GetApdu())
+		dataApduMemoryResponse := driverModel.CastApduDataMemoryResponse(dataContainer.GetDataApdu())
+
+		// Acknowledge the receipt
+		_ = m.sendDeviceAck(ctx, targetAddress, dataFrameExt.GetApdu().GetCounter(), func(err error) {
+			// If the error flag is set, there was an error authenticating
+			if lDataInd.GetDataFrame().GetErrorFlag() {
+				errorResult <- errors.Errorf("error reading memory from device: %s", KnxAddressToString(targetAddress))
+			} else if err != nil {
+				errorResult <- errors.Errorf("error sending ack to device: %s", KnxAddressToString(targetAddress))
+			} else {
+				result <- dataApduMemoryResponse
 			}
-			errorResult <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.defaultTtl)
+		})
+
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		errorResult <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return nil, errors.Wrap(err, "got error sending memory read request")
@@ -1101,7 +1043,7 @@ func (m *Connection) sendDeviceMemoryReadRequest(targetAddress driverModel.KnxAd
 	}
 }
 
-func (m *Connection) sendDeviceAck(targetAddress driverModel.KnxAddress, counter uint8, callback func(err error)) error {
+func (m *Connection) sendDeviceAck(ctx context.Context, targetAddress driverModel.KnxAddress, counter uint8, callback func(err error)) error {
 	ack := driverModel.NewTunnelingRequest(
 		driverModel.NewTunnelingRequestDataBlock(m.CommunicationChannelId, m.getNewSequenceCounter()),
 		driverModel.NewLDataReq(
@@ -1124,57 +1066,52 @@ func (m *Connection) sendDeviceAck(targetAddress driverModel.KnxAddress, counter
 		0,
 	)
 
-	err := m.messageCodec.SendRequest(
-		ack,
-		func(message spi.Message) bool {
-			tunnelingRequest := driverModel.CastTunnelingRequest(message)
-			if tunnelingRequest == nil ||
-				tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
-				return false
-			}
-			lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
-			if lDataCon == nil {
-				return false
-			}
-			dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
-			if dataFrameExt == nil {
-				return false
-			}
-			// Check if the addresses match
-			if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress {
-				return false
-			}
-			curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
-			if curTargetAddress != targetAddress {
-				return false
-			}
-			// Check if the counter matches
-			if dataFrameExt.GetApdu().GetCounter() != counter {
-				return false
-			}
-			controlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
-			if controlContainer == nil {
-				return false
-			}
-			dataApduAck := driverModel.CastApduControlAck(controlContainer.GetControlApdu())
-			if dataApduAck == nil {
-				return false
-			}
-			return true
-		},
-		func(message spi.Message) error {
-			callback(nil)
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				m.handleTimeout()
-			}
-			callback(errors.Wrap(err, "got error processing request"))
-			return nil
-		},
-		m.defaultTtl)
+	err := m.messageCodec.SendRequest(ctx, ack, func(message spi.Message) bool {
+		tunnelingRequest := driverModel.CastTunnelingRequest(message)
+		if tunnelingRequest == nil ||
+			tunnelingRequest.GetTunnelingRequestDataBlock().GetCommunicationChannelId() != m.CommunicationChannelId {
+			return false
+		}
+		lDataCon := driverModel.CastLDataCon(tunnelingRequest.GetCemi())
+		if lDataCon == nil {
+			return false
+		}
+		dataFrameExt := driverModel.CastLDataExtended(lDataCon.GetDataFrame())
+		if dataFrameExt == nil {
+			return false
+		}
+		// Check if the addresses match
+		if dataFrameExt.GetSourceAddress() != m.ClientKnxAddress {
+			return false
+		}
+		curTargetAddress := ByteArrayToKnxAddress(dataFrameExt.GetDestinationAddress())
+		if curTargetAddress != targetAddress {
+			return false
+		}
+		// Check if the counter matches
+		if dataFrameExt.GetApdu().GetCounter() != counter {
+			return false
+		}
+		controlContainer := driverModel.CastApduControlContainer(dataFrameExt.GetApdu())
+		if controlContainer == nil {
+			return false
+		}
+		dataApduAck := driverModel.CastApduControlAck(controlContainer.GetControlApdu())
+		if dataApduAck == nil {
+			return false
+		}
+		return true
+	}, func(message spi.Message) error {
+		callback(nil)
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			m.handleTimeout()
+		}
+		callback(errors.Wrap(err, "got error processing request"))
+		return nil
+	}, m.defaultTtl)
 
 	if err != nil {
 		return errors.Wrap(err, "got error sending ack request")
diff --git a/plc4go/internal/knxnetip/Reader.go b/plc4go/internal/knxnetip/Reader.go
index c1983d414..159739f06 100644
--- a/plc4go/internal/knxnetip/Reader.go
+++ b/plc4go/internal/knxnetip/Reader.go
@@ -108,7 +108,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
 					propertyField := field.(DevicePropertyAddressPlcField)
 
 					timeout := time.NewTimer(m.connection.defaultTtl)
-					results := m.connection.DeviceReadProperty(deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
+					results := m.connection.DeviceReadProperty(ctx, deviceAddress, propertyField.ObjectId, propertyField.PropertyId, propertyField.PropertyIndex, propertyField.NumElements)
 					select {
 					case result := <-results:
 						if !timeout.Stop() {
@@ -129,7 +129,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
 				case DeviceMemoryAddressPlcField:
 					timeout := time.NewTimer(m.connection.defaultTtl)
 					memoryField := field.(DeviceMemoryAddressPlcField)
-					results := m.connection.DeviceReadMemory(deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
+					results := m.connection.DeviceReadMemory(ctx, deviceAddress, memoryField.Address, memoryField.NumElements, memoryField.FieldType)
 					select {
 					case result := <-results:
 						if !timeout.Stop() {
@@ -153,7 +153,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
 
 		// Get the group address values from the cache
 		for fieldName, field := range groupAddresses {
-			responseCode, plcValue := m.readGroupAddress(field)
+			responseCode, plcValue := m.readGroupAddress(ctx, field)
 			responseCodes[fieldName] = responseCode
 			plcValues[fieldName] = plcValue
 		}
@@ -169,7 +169,7 @@ func (m Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest) <
 	return resultChan
 }
 
-func (m Reader) readGroupAddress(field GroupAddressField) (apiModel.PlcResponseCode, apiValues.PlcValue) {
+func (m Reader) readGroupAddress(ctx context.Context, field GroupAddressField) (apiModel.PlcResponseCode, apiValues.PlcValue) {
 	rawAddresses, err := m.resolveAddresses(field)
 	if err != nil {
 		return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
@@ -193,7 +193,7 @@ func (m Reader) readGroupAddress(field GroupAddressField) (apiModel.PlcResponseC
 		// Otherwise respond with values from the cache.
 		if !ok {
 			addr := []byte{byte(numericAddress >> 8), byte(numericAddress & 0xFF)}
-			rrc := m.connection.ReadGroupAddress(addr, field.GetFieldType())
+			rrc := m.connection.ReadGroupAddress(ctx, addr, field.GetFieldType())
 			select {
 			case readResult := <-rrc:
 				if readResult.value != nil {
diff --git a/plc4go/internal/modbus/Connection.go b/plc4go/internal/modbus/Connection.go
index 5ecdd6079..bd1772dc3 100644
--- a/plc4go/internal/modbus/Connection.go
+++ b/plc4go/internal/modbus/Connection.go
@@ -20,6 +20,7 @@
 package modbus
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -90,35 +91,32 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (m *Connection) Ping() <-chan plc4go.PlcConnectionPingResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	log.Trace().Msg("Pinging")
 	result := make(chan plc4go.PlcConnectionPingResult)
 	go func() {
 		diagnosticRequestPdu := readWriteModel.NewModbusPDUDiagnosticRequest(0, 0x42)
 		pingRequest := readWriteModel.NewModbusTcpADU(1, m.unitIdentifier, diagnosticRequestPdu, false)
-		if err := m.messageCodec.SendRequest(
-			pingRequest,
-			func(message spi.Message) bool {
-				responseAdu := readWriteModel.CastModbusTcpADU(message)
-				return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
-			},
-			func(message spi.Message) error {
-				log.Trace().Msgf("Received Message")
-				if message != nil {
-					// If we got a valid response (even if it will probably contain an error, we know the remote is available)
-					log.Trace().Msg("got valid response")
-					result <- _default.NewDefaultPlcConnectionPingResult(nil)
-				} else {
-					log.Trace().Msg("got no response")
-					result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
-				}
-				return nil
-			},
-			func(err error) error {
-				log.Trace().Msgf("Received Error")
-				result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
-				return nil
-			},
-			time.Second*1); err != nil {
+		if err := m.messageCodec.SendRequest(ctx, pingRequest, func(message spi.Message) bool {
+			responseAdu := readWriteModel.CastModbusTcpADU(message)
+			return responseAdu.GetTransactionIdentifier() == 1 && responseAdu.GetUnitIdentifier() == m.unitIdentifier
+		}, func(message spi.Message) error {
+			log.Trace().Msgf("Received Message")
+			if message != nil {
+				// If we got a valid response (even if it will probably contain an error, we know the remote is available)
+				log.Trace().Msg("got valid response")
+				result <- _default.NewDefaultPlcConnectionPingResult(nil)
+			} else {
+				log.Trace().Msg("got no response")
+				result <- _default.NewDefaultPlcConnectionPingResult(errors.New("no response"))
+			}
+			return nil
+		}, func(err error) error {
+			log.Trace().Msgf("Received Error")
+			result <- _default.NewDefaultPlcConnectionPingResult(errors.Wrap(err, "got error processing request"))
+			return nil
+		}, time.Second*1); err != nil {
 			result <- _default.NewDefaultPlcConnectionPingResult(err)
 		}
 	}()
diff --git a/plc4go/internal/modbus/Reader.go b/plc4go/internal/modbus/Reader.go
index d85d1ed4e..219849e6c 100644
--- a/plc4go/internal/modbus/Reader.go
+++ b/plc4go/internal/modbus/Reader.go
@@ -118,43 +118,38 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 
 		// Send the ADU over the wire
 		log.Trace().Msg("Send ADU")
-		if err = m.messageCodec.SendRequest(
-			requestAdu,
-			func(message spi.Message) bool {
-				responseAdu := message.(readWriteModel.ModbusTcpADU)
-				return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
-					responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
-			},
-			func(message spi.Message) error {
-				// Convert the response into an ADU
-				log.Trace().Msg("convert response to ADU")
-				responseAdu := message.(readWriteModel.ModbusTcpADU)
-				// Convert the modbus response into a PLC4X response
-				log.Trace().Msg("convert response to PLC4X response")
-				readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
+		if err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool {
+			responseAdu := message.(readWriteModel.ModbusTcpADU)
+			return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
+				responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
+		}, func(message spi.Message) error {
+			// Convert the response into an ADU
+			log.Trace().Msg("convert response to ADU")
+			responseAdu := message.(readWriteModel.ModbusTcpADU)
+			// Convert the modbus response into a PLC4X response
+			log.Trace().Msg("convert response to PLC4X response")
+			readResponse, err := m.ToPlc4xReadResponse(responseAdu, readRequest)
 
-				if err != nil {
-					result <- &plc4goModel.DefaultPlcReadRequestResult{
-						Request: readRequest,
-						Err:     errors.Wrap(err, "Error decoding response"),
-					}
-					// TODO: should we return the error here?
-					return nil
-				}
-				result <- &plc4goModel.DefaultPlcReadRequestResult{
-					Request:  readRequest,
-					Response: readResponse,
-				}
-				return nil
-			},
-			func(err error) error {
+			if err != nil {
 				result <- &plc4goModel.DefaultPlcReadRequestResult{
 					Request: readRequest,
-					Err:     errors.Wrap(err, "got timeout while waiting for response"),
+					Err:     errors.Wrap(err, "Error decoding response"),
 				}
+				// TODO: should we return the error here?
 				return nil
-			},
-			time.Second*1); err != nil {
+			}
+			result <- &plc4goModel.DefaultPlcReadRequestResult{
+				Request:  readRequest,
+				Response: readResponse,
+			}
+			return nil
+		}, func(err error) error {
+			result <- &plc4goModel.DefaultPlcReadRequestResult{
+				Request: readRequest,
+				Err:     errors.Wrap(err, "got timeout while waiting for response"),
+			}
+			return nil
+		}, time.Second*1); err != nil {
 			result <- &plc4goModel.DefaultPlcReadRequestResult{
 				Request:  readRequest,
 				Response: nil,
diff --git a/plc4go/internal/modbus/Writer.go b/plc4go/internal/modbus/Writer.go
index d0ff99d9d..3d949d238 100644
--- a/plc4go/internal/modbus/Writer.go
+++ b/plc4go/internal/modbus/Writer.go
@@ -129,40 +129,35 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 		requestAdu := readWriteModel.NewModbusTcpADU(uint16(transactionIdentifier), m.unitIdentifier, pdu, false)
 
 		// Send the ADU over the wire
-		err = m.messageCodec.SendRequest(
-			requestAdu,
-			func(message spi.Message) bool {
-				responseAdu := message.(readWriteModel.ModbusTcpADU)
-				return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
-					responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
-			},
-			func(message spi.Message) error {
-				// Convert the response into an ADU
-				responseAdu := message.(readWriteModel.ModbusTcpADU)
-				// Convert the modbus response into a PLC4X response
-				readResponse, err := m.ToPlc4xWriteResponse(requestAdu, responseAdu, writeRequest)
-
-				if err != nil {
-					result <- &plc4goModel.DefaultPlcWriteRequestResult{
-						Request: writeRequest,
-						Err:     errors.Wrap(err, "Error decoding response"),
-					}
-				} else {
-					result <- &plc4goModel.DefaultPlcWriteRequestResult{
-						Request:  writeRequest,
-						Response: readResponse,
-					}
-				}
-				return nil
-			},
-			func(err error) error {
+		err = m.messageCodec.SendRequest(ctx, requestAdu, func(message spi.Message) bool {
+			responseAdu := message.(readWriteModel.ModbusTcpADU)
+			return responseAdu.GetTransactionIdentifier() == uint16(transactionIdentifier) &&
+				responseAdu.GetUnitIdentifier() == requestAdu.UnitIdentifier
+		}, func(message spi.Message) error {
+			// Convert the response into an ADU
+			responseAdu := message.(readWriteModel.ModbusTcpADU)
+			// Convert the modbus response into a PLC4X response
+			readResponse, err := m.ToPlc4xWriteResponse(requestAdu, responseAdu, writeRequest)
+
+			if err != nil {
 				result <- &plc4goModel.DefaultPlcWriteRequestResult{
 					Request: writeRequest,
-					Err:     errors.New("got timeout while waiting for response"),
+					Err:     errors.Wrap(err, "Error decoding response"),
+				}
+			} else {
+				result <- &plc4goModel.DefaultPlcWriteRequestResult{
+					Request:  writeRequest,
+					Response: readResponse,
 				}
-				return nil
-			},
-			time.Second*1)
+			}
+			return nil
+		}, func(err error) error {
+			result <- &plc4goModel.DefaultPlcWriteRequestResult{
+				Request: writeRequest,
+				Err:     errors.New("got timeout while waiting for response"),
+			}
+			return nil
+		}, time.Second*1)
 	}()
 	return result
 }
diff --git a/plc4go/internal/s7/Connection.go b/plc4go/internal/s7/Connection.go
index fd3c94396..a68206819 100644
--- a/plc4go/internal/s7/Connection.go
+++ b/plc4go/internal/s7/Connection.go
@@ -20,6 +20,7 @@
 package s7
 
 import (
+	"context"
 	"fmt"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -105,6 +106,8 @@ func (m *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	// TODO: use proper context
+	ctx := context.TODO()
 	log.Trace().Msg("Connecting")
 	ch := make(chan plc4go.PlcConnectionConnectResult)
 	go func() {
@@ -122,7 +125,7 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 
 		// For testing purposes we can skip the waiting for a complete connection
 		if !m.driverContext.awaitSetupComplete {
-			go m.setupConnection(ch)
+			go m.setupConnection(ctx, ch)
 			log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
 			// Here we write directly and don't wait till the connection is "really" connected
 			// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
@@ -134,43 +137,37 @@ func (m *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 		// Only the TCP transport supports login.
 		log.Info().Msg("S7 Driver running in ACTIVE mode.")
 
-		m.setupConnection(ch)
+		m.setupConnection(ctx, ch)
 	}()
 	return ch
 }
 
-func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+func (m *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult) {
 	log.Debug().Msg("Sending COTP Connection Request")
 	// Open the session on ISO Transport Protocol first.
 	cotpConnectionResult := make(chan readWriteModel.COTPPacketConnectionResponse)
 	cotpConnectionErrorChan := make(chan error)
-	if err := m.messageCodec.SendRequest(
-		readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()),
-		func(message spi.Message) bool {
-			tpktPacket := message.(readWriteModel.TPKTPacket)
-			if tpktPacket == nil {
-				return false
-			}
-			cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
-			return cotpPacketConnectionResponse != nil
-		},
-		func(message spi.Message) error {
-			tpktPacket := message.(readWriteModel.TPKTPacket)
-			cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
-			cotpConnectionResult <- cotpPacketConnectionResponse
-			return nil
-		},
-		func(err error) error {
-			// If this is a timeout, do a check if the connection requires a reconnection
-			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-				m.Close()
-			}
-			cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
-			return nil
-		},
-		m.GetTtl(),
-	); err != nil {
+	if err := m.messageCodec.SendRequest(ctx, readWriteModel.NewTPKTPacket(m.createCOTPConnectionRequest()), func(message spi.Message) bool {
+		tpktPacket := message.(readWriteModel.TPKTPacket)
+		if tpktPacket == nil {
+			return false
+		}
+		cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
+		return cotpPacketConnectionResponse != nil
+	}, func(message spi.Message) error {
+		tpktPacket := message.(readWriteModel.TPKTPacket)
+		cotpPacketConnectionResponse := tpktPacket.GetPayload().(readWriteModel.COTPPacketConnectionResponse)
+		cotpConnectionResult <- cotpPacketConnectionResponse
+		return nil
+	}, func(err error) error {
+		// If this is a timeout, do a check if the connection requires a reconnection
+		if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+			log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+			m.Close()
+		}
+		cotpConnectionErrorChan <- errors.Wrap(err, "got error processing request")
+		return nil
+	}, m.GetTtl()); err != nil {
 		m.fireConnectionError(errors.Wrap(err, "Error during sending of COTP Connection Request"), ch)
 	}
 	select {
@@ -181,43 +178,37 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 		// Send an S7 login message.
 		s7ConnectionResult := make(chan readWriteModel.S7ParameterSetupCommunication)
 		s7ConnectionErrorChan := make(chan error)
-		if err := m.messageCodec.SendRequest(
-			m.createS7ConnectionRequest(cotpPacketConnectionResponse),
-			func(message spi.Message) bool {
-				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
-				if !ok {
-					return false
-				}
-				cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
-				if !ok {
-					return false
-				}
-				messageResponseData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseDataExactly)
-				if !ok {
-					return false
-				}
-				_, ok = messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunicationExactly)
-				return ok
-			},
-			func(message spi.Message) error {
-				tpktPacket := message.(readWriteModel.TPKTPacket)
-				cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
-				messageResponseData := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseData)
-				setupCommunication := messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunication)
-				s7ConnectionResult <- setupCommunication
-				return nil
-			},
-			func(err error) error {
-				// If this is a timeout, do a check if the connection requires a reconnection
-				if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-					log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-					m.Close()
-				}
-				s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
-				return nil
-			},
-			m.GetTtl(),
-		); err != nil {
+		if err := m.messageCodec.SendRequest(ctx, m.createS7ConnectionRequest(cotpPacketConnectionResponse), func(message spi.Message) bool {
+			tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+			if !ok {
+				return false
+			}
+			cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+			if !ok {
+				return false
+			}
+			messageResponseData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseDataExactly)
+			if !ok {
+				return false
+			}
+			_, ok = messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunicationExactly)
+			return ok
+		}, func(message spi.Message) error {
+			tpktPacket := message.(readWriteModel.TPKTPacket)
+			cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+			messageResponseData := cotpPacketData.GetPayload().(readWriteModel.S7MessageResponseData)
+			setupCommunication := messageResponseData.GetParameter().(readWriteModel.S7ParameterSetupCommunication)
+			s7ConnectionResult <- setupCommunication
+			return nil
+		}, func(err error) error {
+			// If this is a timeout, do a check if the connection requires a reconnection
+			if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+				log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+				m.Close()
+			}
+			s7ConnectionErrorChan <- errors.Wrap(err, "got error processing request")
+			return nil
+		}, m.GetTtl()); err != nil {
 			m.fireConnectionError(errors.Wrap(err, "Error during sending of S7 Connection Request"), ch)
 		}
 		select {
@@ -248,42 +239,36 @@ func (m *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 			log.Debug().Msg("Sending S7 Identification Request")
 			s7IdentificationResult := make(chan readWriteModel.S7PayloadUserData)
 			s7IdentificationErrorChan := make(chan error)
-			if err := m.messageCodec.SendRequest(
-				m.createIdentifyRemoteMessage(),
-				func(message spi.Message) bool {
-					tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
-					if !ok {
-						return false
-					}
-					cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
-					if !ok {
-						return false
-					}
-					messageUserData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserDataExactly)
-					if !ok {
-						return false
-					}
-					_, ok = messageUserData.GetPayload().(readWriteModel.S7PayloadUserDataExactly)
-					return ok
-				},
-				func(message spi.Message) error {
-					tpktPacket := message.(readWriteModel.TPKTPacket)
-					cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
-					messageUserData := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserData)
-					s7IdentificationResult <- messageUserData.GetPayload().(readWriteModel.S7PayloadUserData)
-					return nil
-				},
-				func(err error) error {
-					// If this is a timeout, do a check if the connection requires a reconnection
-					if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
-						log.Warn().Msg("Timeout during Connection establishing, closing channel...")
-						m.Close()
-					}
-					s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
-					return nil
-				},
-				m.GetTtl(),
-			); err != nil {
+			if err := m.messageCodec.SendRequest(ctx, m.createIdentifyRemoteMessage(), func(message spi.Message) bool {
+				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+				if !ok {
+					return false
+				}
+				cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+				if !ok {
+					return false
+				}
+				messageUserData, ok := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserDataExactly)
+				if !ok {
+					return false
+				}
+				_, ok = messageUserData.GetPayload().(readWriteModel.S7PayloadUserDataExactly)
+				return ok
+			}, func(message spi.Message) error {
+				tpktPacket := message.(readWriteModel.TPKTPacket)
+				cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+				messageUserData := cotpPacketData.GetPayload().(readWriteModel.S7MessageUserData)
+				s7IdentificationResult <- messageUserData.GetPayload().(readWriteModel.S7PayloadUserData)
+				return nil
+			}, func(err error) error {
+				// If this is a timeout, do a check if the connection requires a reconnection
+				if _, isTimeout := err.(plcerrors.TimeoutError); isTimeout {
+					log.Warn().Msg("Timeout during Connection establishing, closing channel...")
+					m.Close()
+				}
+				s7IdentificationErrorChan <- errors.Wrap(err, "got error processing request")
+				return nil
+			}, m.GetTtl()); err != nil {
 				m.fireConnectionError(errors.Wrap(err, "Error during sending of identify remote Request"), ch)
 			}
 			select {
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 68094ce47..5f10bd074 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -99,54 +99,49 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
-			if err := m.messageCodec.SendRequest(
-				tpktPacket,
-				func(message spi.Message) bool {
-					tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
-					if !ok {
-						return false
-					}
-					cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
-					if !ok {
-						return false
-					}
-					payload := cotpPacketData.GetPayload()
-					if payload == nil {
-						return false
-					}
-					return payload.GetTpduReference() == tpduId
-				},
-				func(message spi.Message) error {
-					// Convert the response into an
-					log.Trace().Msg("convert response to ")
-					tpktPacket := message.(readWriteModel.TPKTPacket)
-					cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
-					payload := cotpPacketData.GetPayload()
-					// Convert the s7 response into a PLC4X response
-					log.Trace().Msg("convert response to PLC4X response")
-					readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
+			if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
+				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+				if !ok {
+					return false
+				}
+				cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+				if !ok {
+					return false
+				}
+				payload := cotpPacketData.GetPayload()
+				if payload == nil {
+					return false
+				}
+				return payload.GetTpduReference() == tpduId
+			}, func(message spi.Message) error {
+				// Convert the response into an
+				log.Trace().Msg("convert response to ")
+				tpktPacket := message.(readWriteModel.TPKTPacket)
+				cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+				payload := cotpPacketData.GetPayload()
+				// Convert the s7 response into a PLC4X response
+				log.Trace().Msg("convert response to PLC4X response")
+				readResponse, err := m.ToPlc4xReadResponse(payload, readRequest)
 
-					if err != nil {
-						result <- &plc4goModel.DefaultPlcReadRequestResult{
-							Request: readRequest,
-							Err:     errors.Wrap(err, "Error decoding response"),
-						}
-						return transaction.EndRequest()
-					}
-					result <- &plc4goModel.DefaultPlcReadRequestResult{
-						Request:  readRequest,
-						Response: readResponse,
-					}
-					return transaction.EndRequest()
-				},
-				func(err error) error {
+				if err != nil {
 					result <- &plc4goModel.DefaultPlcReadRequestResult{
 						Request: readRequest,
-						Err:     errors.Wrap(err, "got timeout while waiting for response"),
+						Err:     errors.Wrap(err, "Error decoding response"),
 					}
 					return transaction.EndRequest()
-				},
-				time.Second*1); err != nil {
+				}
+				result <- &plc4goModel.DefaultPlcReadRequestResult{
+					Request:  readRequest,
+					Response: readResponse,
+				}
+				return transaction.EndRequest()
+			}, func(err error) error {
+				result <- &plc4goModel.DefaultPlcReadRequestResult{
+					Request: readRequest,
+					Err:     errors.Wrap(err, "got timeout while waiting for response"),
+				}
+				return transaction.EndRequest()
+			}, time.Second*1); err != nil {
 				result <- &plc4goModel.DefaultPlcReadRequestResult{
 					Request:  readRequest,
 					Response: nil,
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index f86369b1e..40a69b73b 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -102,54 +102,49 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 		transaction := m.tm.StartTransaction()
 		transaction.Submit(func() {
 			// Send the  over the wire
-			if err := m.messageCodec.SendRequest(
-				tpktPacket,
-				func(message spi.Message) bool {
-					tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
-					if !ok {
-						return false
-					}
-					cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
-					if !ok {
-						return false
-					}
-					payload := cotpPacketData.GetPayload()
-					if payload == nil {
-						return false
-					}
-					return payload.GetTpduReference() == tpduId
-				},
-				func(message spi.Message) error {
-					// Convert the response into an
-					log.Trace().Msg("convert response to ")
-					tpktPacket := message.(readWriteModel.TPKTPacket)
-					cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
-					payload := cotpPacketData.GetPayload()
-					// Convert the s7 response into a PLC4X response
-					log.Trace().Msg("convert response to PLC4X response")
-					readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
-
-					if err != nil {
-						result <- &plc4goModel.DefaultPlcWriteRequestResult{
-							Request: writeRequest,
-							Err:     errors.Wrap(err, "Error decoding response"),
-						}
-						return transaction.EndRequest()
-					}
-					result <- &plc4goModel.DefaultPlcWriteRequestResult{
-						Request:  writeRequest,
-						Response: readResponse,
-					}
-					return transaction.EndRequest()
-				},
-				func(err error) error {
+			if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
+				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
+				if !ok {
+					return false
+				}
+				cotpPacketData, ok := tpktPacket.GetPayload().(readWriteModel.COTPPacketDataExactly)
+				if !ok {
+					return false
+				}
+				payload := cotpPacketData.GetPayload()
+				if payload == nil {
+					return false
+				}
+				return payload.GetTpduReference() == tpduId
+			}, func(message spi.Message) error {
+				// Convert the response into an
+				log.Trace().Msg("convert response to ")
+				tpktPacket := message.(readWriteModel.TPKTPacket)
+				cotpPacketData := tpktPacket.GetPayload().(readWriteModel.COTPPacketData)
+				payload := cotpPacketData.GetPayload()
+				// Convert the s7 response into a PLC4X response
+				log.Trace().Msg("convert response to PLC4X response")
+				readResponse, err := m.ToPlc4xWriteResponse(payload, writeRequest)
+
+				if err != nil {
 					result <- &plc4goModel.DefaultPlcWriteRequestResult{
 						Request: writeRequest,
-						Err:     errors.New("got timeout while waiting for response"),
+						Err:     errors.Wrap(err, "Error decoding response"),
 					}
 					return transaction.EndRequest()
-				},
-				time.Second*1); err != nil {
+				}
+				result <- &plc4goModel.DefaultPlcWriteRequestResult{
+					Request:  writeRequest,
+					Response: readResponse,
+				}
+				return transaction.EndRequest()
+			}, func(err error) error {
+				result <- &plc4goModel.DefaultPlcWriteRequestResult{
+					Request: writeRequest,
+					Err:     errors.New("got timeout while waiting for response"),
+				}
+				return transaction.EndRequest()
+			}, time.Second*1); err != nil {
 				result <- &plc4goModel.DefaultPlcWriteRequestResult{
 					Request:  writeRequest,
 					Response: nil,
diff --git a/plc4go/spi/MessageCodec.go b/plc4go/spi/MessageCodec.go
index 55319625c..f8a537b11 100644
--- a/plc4go/spi/MessageCodec.go
+++ b/plc4go/spi/MessageCodec.go
@@ -20,16 +20,18 @@
 package spi
 
 import (
+	"context"
 	"fmt"
 	"time"
 )
 
 type Expectation interface {
+	fmt.Stringer
+	GetContext() context.Context
 	GetExpiration() time.Time
 	GetAcceptsMessage() AcceptsMessage
 	GetHandleMessage() HandleMessage
 	GetHandleError() HandleError
-	fmt.Stringer
 }
 
 // AcceptsMessage If this function returns true, the message is forwarded to the message handler
@@ -53,9 +55,9 @@ type MessageCodec interface {
 	Send(message Message) error
 	// Expect Wait for a given timespan for a message to come in, which returns 'true' for 'acceptMessage'
 	// and is then forwarded to the 'handleMessage' function
-	Expect(acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
+	Expect(ctx context.Context, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
 	// SendRequest A combination that sends a message first and then waits for a response
-	SendRequest(message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
+	SendRequest(ctx context.Context, message Message, acceptsMessage AcceptsMessage, handleMessage HandleMessage, handleError HandleError, ttl time.Duration) error
 
 	// GetDefaultIncomingMessageChannel gives back the chan where unexpected messages arrive
 	GetDefaultIncomingMessageChannel() chan Message
diff --git a/plc4go/spi/default/DefaultCodec.go b/plc4go/spi/default/DefaultCodec.go
index 8c4d967fe..6fee263e8 100644
--- a/plc4go/spi/default/DefaultCodec.go
+++ b/plc4go/spi/default/DefaultCodec.go
@@ -20,6 +20,7 @@
 package _default
 
 import (
+	"context"
 	"fmt"
 	"runtime/debug"
 	"time"
@@ -54,6 +55,7 @@ func NewDefaultCodec(requirements DefaultCodecRequirements, transportInstance tr
 }
 
 type DefaultExpectation struct {
+	Context        context.Context
 	Expiration     time.Time
 	AcceptsMessage spi.AcceptsMessage
 	HandleMessage  spi.HandleMessage
@@ -111,6 +113,10 @@ func buildDefaultCodec(defaultCodecRequirements DefaultCodecRequirements, transp
 ///////////////////////////////////////
 ///////////////////////////////////////
 
+func (m *DefaultExpectation) GetContext() context.Context {
+	return m.Context
+}
+
 func (m *DefaultExpectation) GetExpiration() time.Time {
 	return m.Expiration
 }
@@ -167,8 +173,9 @@ func (m *defaultCodec) IsRunning() bool {
 	return m.running
 }
 
-func (m *defaultCodec) Expect(acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+func (m *defaultCodec) Expect(ctx context.Context, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
 	expectation := &DefaultExpectation{
+		Context:        ctx,
 		Expiration:     time.Now().Add(ttl),
 		AcceptsMessage: acceptsMessage,
 		HandleMessage:  handleMessage,
@@ -178,14 +185,17 @@ func (m *defaultCodec) Expect(acceptsMessage spi.AcceptsMessage, handleMessage s
 	return nil
 }
 
-func (m *defaultCodec) SendRequest(message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+func (m *defaultCodec) SendRequest(ctx context.Context, message spi.Message, acceptsMessage spi.AcceptsMessage, handleMessage spi.HandleMessage, handleError spi.HandleError, ttl time.Duration) error {
+	if err := ctx.Err(); err != nil {
+		return errors.Wrap(err, "Not sending message as context is aborted")
+	}
 	log.Trace().Msg("Sending request")
 	// Send the actual message
 	err := m.Send(message)
 	if err != nil {
 		return errors.Wrap(err, "Error sending the request")
 	}
-	return m.Expect(acceptsMessage, handleMessage, handleError, ttl)
+	return m.Expect(ctx, acceptsMessage, handleMessage, handleError, ttl)
 }
 
 func (m *defaultCodec) TimeoutExpectations(now time.Time) {
@@ -195,11 +205,18 @@ func (m *defaultCodec) TimeoutExpectations(now time.Time) {
 			// Remove this expectation from the list.
 			m.expectations = append(m.expectations[:index], m.expectations[index+1:]...)
 			// Call the error handler.
-			// TODO: decouple from worker thread
-			err := expectation.GetHandleError()(plcerrors.NewTimeoutError(now.Sub(expectation.GetExpiration())))
-			if err != nil {
-				log.Error().Err(err).Msg("Got an error handling error on expectation")
-			}
+			go func() {
+				if err := expectation.GetHandleError()(plcerrors.NewTimeoutError(now.Sub(expectation.GetExpiration()))); err != nil {
+					log.Error().Err(err).Msg("Got an error handling error on expectation")
+				}
+			}()
+		}
+		if err := expectation.GetContext().Err(); err != nil {
+			go func() {
+				if err := expectation.GetHandleError()(err); err != nil {
+					log.Error().Err(err).Msg("Got an error handling error on expectation")
+				}
+			}()
 		}
 	}
 }