You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/02 13:33:03 UTC

[plc4x] branch develop updated (df293b2684 -> fe482d9305)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from df293b2684 test(plc4go/cbus): more tests for Reader
     new fcd62ce597 refactor(plc4go/cbus): split up reader into multiple methods
     new fe482d9305 refactor(plc4go/spi): introduce RequestTransactionRunnable

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/Reader.go             |   3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go |  44 +++----
 plc4go/internal/cbus/Reader.go                 | 168 +++++++++++++------------
 plc4go/internal/cbus/Writer.go                 |   2 +-
 plc4go/internal/eip/Reader.go                  |   2 +-
 plc4go/internal/eip/Writer.go                  |   4 +-
 plc4go/internal/s7/Reader.go                   |   2 +-
 plc4go/internal/s7/Writer.go                   |   2 +-
 plc4go/spi/RequestTransactionManager.go        |  10 +-
 plc4go/spi/RequestTransactionManager_test.go   |   6 +-
 10 files changed, 124 insertions(+), 119 deletions(-)


[plc4x] 01/02: refactor(plc4go/cbus): split up reader into multiple methods

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fcd62ce597efa89e4064711b2ba30f1581995034
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 15:20:44 2023 +0200

    refactor(plc4go/cbus): split up reader into multiple methods
---
 plc4go/internal/cbus/Reader.go | 166 +++++++++++++++++++++--------------------
 1 file changed, 85 insertions(+), 81 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 1ab96bdfdc..64a2baa771 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -101,7 +101,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 			}
 			return
 		}
-		m.sendMessage(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
+		m.createMessageTransactionAndWait(ctx, messageToSend, addResponseCode, tagName, addPlcValue)
 	}
 	readResponse := spiModel.NewDefaultPlcReadResponse(readRequest, responseCodes, plcValues)
 	result <- &spiModel.DefaultPlcReadRequestResult{
@@ -110,94 +110,98 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 	}
 }
 
-func (m *Reader) sendMessage(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
 	transaction.Submit(func() {
-		// Send the  over the wire
-		log.Trace().Msg("Send ")
-		if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
-			cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
-			if !ok {
-				return false
-			}
-			messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
-			if !ok {
-				return false
-			}
-			// Check if this errored
-			if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-				// This means we must handle this below
-				return true
-			}
+		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
+	})
+	if err := transaction.AwaitCompletion(ctx); err != nil {
+		log.Warn().Err(err).Msg("Error while awaiting completion")
+	}
+}
 
-			confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-			if !ok {
-				return false
-			}
-			return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
-		}, func(receivedMessage spi.Message) error {
-			defer func(transaction spi.RequestTransaction) {
-				// This is just to make sure we don't forget to close the transaction here
-				_ = transaction.EndRequest()
-			}(transaction)
-			// Convert the response into an
-			log.Trace().Msg("convert response to ")
-			cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
-			messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
-			if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
-				log.Trace().Msg("We got a server failure")
-				addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
-				return transaction.EndRequest()
-			}
-			replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
-			if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
-				var responseCode apiModel.PlcResponseCode
-				switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
-					responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
-					responseCode = apiModel.PlcResponseCode_INVALID_DATA
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
-					responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
-				case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
-					responseCode = apiModel.PlcResponseCode_INVALID_DATA
-				default:
-					return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
-				}
-				log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
-				addResponseCode(tagName, responseCode)
-				return transaction.EndRequest()
-			}
+func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction spi.RequestTransaction, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
+	// Send the  over the wire
+	log.Trace().Msg("Send ")
+	if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
+		cbusMessage, ok := receivedMessage.(readWriteModel.CBusMessageExactly)
+		if !ok {
+			return false
+		}
+		messageToClient, ok := cbusMessage.(readWriteModel.CBusMessageToClientExactly)
+		if !ok {
+			return false
+		}
+		// Check if this errored
+		if _, ok = messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+			// This means we must handle this below
+			return true
+		}
 
-			alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
-			// TODO: it could be double confirmed but this is not implemented yet
-			embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
-			if !ok {
-				log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
-				addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
-				return transaction.EndRequest()
+		confirmation, ok := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+		if !ok {
+			return false
+		}
+		return confirmation.GetConfirmation().GetAlpha().GetCharacter() == messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(readWriteModel.RequestCommand).GetAlpha().GetCharacter()
+	}, func(receivedMessage spi.Message) error {
+		defer func(transaction spi.RequestTransaction) {
+			// This is just to make sure we don't forget to close the transaction here
+			_ = transaction.EndRequest()
+		}(transaction)
+		// Convert the response into an
+		log.Trace().Msg("convert response to ")
+		cbusMessage := receivedMessage.(readWriteModel.CBusMessage)
+		messageToClient := cbusMessage.(readWriteModel.CBusMessageToClient)
+		if _, ok := messageToClient.GetReply().(readWriteModel.ServerErrorReplyExactly); ok {
+			log.Trace().Msg("We got a server failure")
+			addResponseCode(tagName, apiModel.PlcResponseCode_INVALID_DATA)
+			return transaction.EndRequest()
+		}
+		replyOrConfirmationConfirmation := messageToClient.GetReply().(readWriteModel.ReplyOrConfirmationConfirmationExactly)
+		if !replyOrConfirmationConfirmation.GetConfirmation().GetIsSuccess() {
+			var responseCode apiModel.PlcResponseCode
+			switch replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType() {
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TO_MANY_RE_TRANSMISSIONS:
+				responseCode = apiModel.PlcResponseCode_REMOTE_ERROR
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_CORRUPTION:
+				responseCode = apiModel.PlcResponseCode_INVALID_DATA
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_SYNC_LOSS:
+				responseCode = apiModel.PlcResponseCode_REMOTE_BUSY
+			case readWriteModel.ConfirmationType_NOT_TRANSMITTED_TOO_LONG:
+				responseCode = apiModel.PlcResponseCode_INVALID_DATA
+			default:
+				return transaction.FailRequest(errors.Errorf("Every code should be mapped here: %v", replyOrConfirmationConfirmation.GetConfirmation().GetConfirmationType()))
 			}
+			log.Trace().Msgf("Was no success %s:%v", tagName, responseCode)
+			addResponseCode(tagName, responseCode)
+			return transaction.EndRequest()
+		}
 
-			log.Trace().Msg("Handling confirmed data")
-			// TODO: check if we can use a plcValueSerializer
-			encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
-			if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
-				return errors.Wrap(err, "error encoding reply")
-			}
+		alpha := replyOrConfirmationConfirmation.GetConfirmation().GetAlpha()
+		// TODO: it could be double confirmed but this is not implemented yet
+		embeddedReply, ok := replyOrConfirmationConfirmation.GetEmbeddedReply().(readWriteModel.ReplyOrConfirmationReplyExactly)
+		if !ok {
+			log.Trace().Msgf("Is a confirm only, no data. Alpha: %c", alpha.GetCharacter())
+			addResponseCode(tagName, apiModel.PlcResponseCode_NOT_FOUND)
 			return transaction.EndRequest()
-		}, func(err error) error {
-			addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
-			return transaction.FailRequest(err)
-		}, time.Second*1); err != nil {
-			log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
-			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
-			if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
-				log.Debug().Err(err).Msg("Error failing request")
-			}
 		}
-	})
-	if err := transaction.AwaitCompletion(ctx); err != nil {
-		log.Warn().Err(err).Msg("Error while awaiting completion")
+
+		log.Trace().Msg("Handling confirmed data")
+		// TODO: check if we can use a plcValueSerializer
+		encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
+		if err := MapEncodedReply(transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
+			return errors.Wrap(err, "error encoding reply")
+		}
+		return transaction.EndRequest()
+	}, func(err error) error {
+		addResponseCode(tagName, apiModel.PlcResponseCode_REQUEST_TIMEOUT)
+		return transaction.FailRequest(err)
+	}, time.Second*1); err != nil {
+		log.Debug().Err(err).Msgf("Error sending message for tag %s", tagName)
+		addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+		if err := transaction.FailRequest(errors.Errorf("timeout after %ss", time.Second*1)); err != nil {
+			log.Debug().Err(err).Msg("Error failing request")
+		}
 	}
 }


[plc4x] 02/02: refactor(plc4go/spi): introduce RequestTransactionRunnable

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fe482d9305136b0a012f3dc86a005fdbab875ab1
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 15:32:55 2023 +0200

    refactor(plc4go/spi): introduce RequestTransactionRunnable
    
    This runnable contains the current transaction as argument so it doesn't need to be passed from the outside
---
 plc4go/internal/bacnetip/Reader.go             |  3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go | 44 +++++++++++++-------------
 plc4go/internal/cbus/Reader.go                 |  2 +-
 plc4go/internal/cbus/Writer.go                 |  2 +-
 plc4go/internal/eip/Reader.go                  |  2 +-
 plc4go/internal/eip/Writer.go                  |  4 +--
 plc4go/internal/s7/Reader.go                   |  2 +-
 plc4go/internal/s7/Writer.go                   |  2 +-
 plc4go/spi/RequestTransactionManager.go        | 10 +++---
 plc4go/spi/RequestTransactionManager_test.go   |  6 ++--
 10 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index d67378150e..e67149466d 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -130,8 +130,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
-
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
 			if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index acf788b02b..4da9984e32 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1453,7 +1453,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1478,7 +1478,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1540,7 +1540,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1602,7 +1602,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1656,7 +1656,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1700,7 +1700,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1747,7 +1747,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1814,7 +1814,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1899,7 +1899,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1947,7 +1947,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1991,7 +1991,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2035,7 +2035,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2079,7 +2079,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2123,7 +2123,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2187,7 +2187,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2231,7 +2231,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2275,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2319,7 +2319,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2363,7 +2363,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2407,7 +2407,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2467,7 +2467,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2511,7 +2511,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 64a2baa771..967ec3ae13 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -113,7 +113,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
-	transaction.Submit(func() {
+	transaction.Submit(func(transaction spi.RequestTransaction) {
 		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
 	})
 	if err := transaction.AwaitCompletion(ctx); err != nil {
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 42d6fc8f3f..46247c1ad4 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -101,7 +101,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
 			tagNameCopy := tagName
 			// Start a new request-transaction (Is ended in the response-handler)
 			transaction := m.tm.StartTransaction()
-			transaction.Submit(func() {
+			transaction.Submit(func(transaction spi.RequestTransaction) {
 				// Send the  over the wire
 				log.Trace().Msg("Send ")
 				if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 6a77f8f6cc..6526071542 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -86,7 +86,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 			}
 			request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0)
 			transaction := m.tm.StartTransaction()
-			transaction.Submit(func() {
+			transaction.Submit(func(transaction spi.RequestTransaction) {
 				if err := m.messageCodec.SendRequest(ctx, request,
 					func(message spi.Message) bool {
 						eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index b8d2d10d6e..b77256611a 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -120,7 +120,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 					)
 					// Start a new request-transaction (Is ended in the response-handler)
 					transaction := m.tm.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// Send the  over the wire
 						if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
 							eipPacket := message.(readWriteModel.EipPacket)
@@ -212,7 +212,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 					)
 					// Start a new request-transaction (Is ended in the response-handler)
 					transaction := m.tm.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// Send the  over the wire
 						if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
 							eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 3379200e78..92ae8a9686 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -95,7 +95,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 		)
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index e8bf221953..5333146918 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -100,7 +100,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 			// Send the  over the wire
 			if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
 				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go
index de9f461aa2..cf14625ad6 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -42,6 +42,8 @@ func init() {
 	sharedExecutorInstance.Start()
 }
 
+type RequestTransactionRunnable func(transaction RequestTransaction)
+
 // RequestTransaction represents a transaction
 type RequestTransaction interface {
 	fmt.Stringer
@@ -49,8 +51,8 @@ type RequestTransaction interface {
 	FailRequest(err error) error
 	// EndRequest signals that this transaction is done
 	EndRequest() error
-	// Submit submits a Runnable to the RequestTransactionManager
-	Submit(operation utils.Runnable)
+	// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
+	Submit(operation RequestTransactionRunnable)
 	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
 	AwaitCompletion(ctx context.Context) error
 }
@@ -224,14 +226,14 @@ func (t *requestTransaction) EndRequest() error {
 	return t.parent.endRequest(t)
 }
 
-func (t *requestTransaction) Submit(operation utils.Runnable) {
+func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
 	if t.operation != nil {
 		log.Warn().Msg("Operation already set")
 	}
 	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
 	t.operation = func() {
 		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
-		operation()
+		operation(t)
 		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
 	}
 	t.parent.submitTransaction(t)
diff --git a/plc4go/spi/RequestTransactionManager_test.go b/plc4go/spi/RequestTransactionManager_test.go
index e746b7569a..0a590bef89 100644
--- a/plc4go/spi/RequestTransactionManager_test.go
+++ b/plc4go/spi/RequestTransactionManager_test.go
@@ -622,7 +622,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 		transactionLog   zerolog.Logger
 	}
 	type args struct {
-		operation utils.Runnable
+		operation RequestTransactionRunnable
 	}
 	tests := []struct {
 		name   string
@@ -635,7 +635,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				parent: &requestTransactionManager{},
 			},
 			args: args{
-				operation: func() {
+				operation: func(_ RequestTransaction) {
 					// NOOP
 				},
 			},
@@ -649,7 +649,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				},
 			},
 			args: args{
-				operation: func() {
+				operation: func(_ RequestTransaction) {
 					// NOOP
 				},
 			},