You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/02 13:33:05 UTC

[plc4x] 02/02: refactor(plc4go/spi): introduce RequestTransactionRunnable

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit fe482d9305136b0a012f3dc86a005fdbab875ab1
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue May 2 15:32:55 2023 +0200

    refactor(plc4go/spi): introduce RequestTransactionRunnable
    
    This runnable contains the current transaction as argument so it doesn't need to be passed from the outside
---
 plc4go/internal/bacnetip/Reader.go             |  3 +-
 plc4go/internal/cbus/CBusMessageMapper_test.go | 44 +++++++++++++-------------
 plc4go/internal/cbus/Reader.go                 |  2 +-
 plc4go/internal/cbus/Writer.go                 |  2 +-
 plc4go/internal/eip/Reader.go                  |  2 +-
 plc4go/internal/eip/Writer.go                  |  4 +--
 plc4go/internal/s7/Reader.go                   |  2 +-
 plc4go/internal/s7/Writer.go                   |  2 +-
 plc4go/spi/RequestTransactionManager.go        | 10 +++---
 plc4go/spi/RequestTransactionManager_test.go   |  6 ++--
 10 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index d67378150e..e67149466d 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -130,8 +130,7 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
-
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
 			if err := m.messageCodec.SendRequest(ctx, apdu, func(message spi.Message) bool {
diff --git a/plc4go/internal/cbus/CBusMessageMapper_test.go b/plc4go/internal/cbus/CBusMessageMapper_test.go
index acf788b02b..4da9984e32 100644
--- a/plc4go/internal/cbus/CBusMessageMapper_test.go
+++ b/plc4go/internal/cbus/CBusMessageMapper_test.go
@@ -1453,7 +1453,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1478,7 +1478,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1540,7 +1540,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1602,7 +1602,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1656,7 +1656,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1700,7 +1700,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1747,7 +1747,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1814,7 +1814,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1899,7 +1899,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1947,7 +1947,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -1991,7 +1991,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2035,7 +2035,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2079,7 +2079,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2123,7 +2123,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2187,7 +2187,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2231,7 +2231,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2275,7 +2275,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2319,7 +2319,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2363,7 +2363,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2407,7 +2407,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2467,7 +2467,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
@@ -2511,7 +2511,7 @@ func TestMapEncodedReply(t *testing.T) {
 				transaction: func() spi.RequestTransaction {
 					transactionManager := spi.NewRequestTransactionManager(1)
 					transaction := transactionManager.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// NO-OP
 					})
 					return transaction
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 64a2baa771..967ec3ae13 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -113,7 +113,7 @@ func (m *Reader) readSync(ctx context.Context, readRequest apiModel.PlcReadReque
 func (m *Reader) createMessageTransactionAndWait(ctx context.Context, messageToSend readWriteModel.CBusMessage, addResponseCode func(name string, responseCode apiModel.PlcResponseCode), tagName string, addPlcValue func(name string, plcValue apiValues.PlcValue)) {
 	// Start a new request-transaction (Is ended in the response-handler)
 	transaction := m.tm.StartTransaction()
-	transaction.Submit(func() {
+	transaction.Submit(func(transaction spi.RequestTransaction) {
 		m.sendMessageOverTheWire(ctx, transaction, messageToSend, addResponseCode, tagName, addPlcValue)
 	})
 	if err := transaction.AwaitCompletion(ctx); err != nil {
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 42d6fc8f3f..46247c1ad4 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -101,7 +101,7 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
 			tagNameCopy := tagName
 			// Start a new request-transaction (Is ended in the response-handler)
 			transaction := m.tm.StartTransaction()
-			transaction.Submit(func() {
+			transaction.Submit(func(transaction spi.RequestTransaction) {
 				// Send the  over the wire
 				log.Trace().Msg("Send ")
 				if err := m.messageCodec.SendRequest(ctx, messageToSend, func(receivedMessage spi.Message) bool {
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 6a77f8f6cc..6526071542 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -86,7 +86,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 			}
 			request := readWriteModel.NewCipRRData(0, 0, typeIds, *m.sessionHandle, uint32(readWriteModel.CIPStatus_Success), []byte(DefaultSenderContext), 0)
 			transaction := m.tm.StartTransaction()
-			transaction.Submit(func() {
+			transaction.Submit(func(transaction spi.RequestTransaction) {
 				if err := m.messageCodec.SendRequest(ctx, request,
 					func(message spi.Message) bool {
 						eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index b8d2d10d6e..b77256611a 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -120,7 +120,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 					)
 					// Start a new request-transaction (Is ended in the response-handler)
 					transaction := m.tm.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// Send the  over the wire
 						if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
 							eipPacket := message.(readWriteModel.EipPacket)
@@ -212,7 +212,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 					)
 					// Start a new request-transaction (Is ended in the response-handler)
 					transaction := m.tm.StartTransaction()
-					transaction.Submit(func() {
+					transaction.Submit(func(transaction spi.RequestTransaction) {
 						// Send the  over the wire
 						if err := m.messageCodec.SendRequest(ctx, pkt, func(message spi.Message) bool {
 							eipPacket := message.(readWriteModel.EipPacket)
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 3379200e78..92ae8a9686 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -95,7 +95,7 @@ func (m *Reader) Read(ctx context.Context, readRequest model.PlcReadRequest) <-c
 		)
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 
 			// Send the  over the wire
 			log.Trace().Msg("Send ")
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index e8bf221953..5333146918 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -100,7 +100,7 @@ func (m Writer) Write(ctx context.Context, writeRequest model.PlcWriteRequest) <
 
 		// Start a new request-transaction (Is ended in the response-handler)
 		transaction := m.tm.StartTransaction()
-		transaction.Submit(func() {
+		transaction.Submit(func(transaction spi.RequestTransaction) {
 			// Send the  over the wire
 			if err := m.messageCodec.SendRequest(ctx, tpktPacket, func(message spi.Message) bool {
 				tpktPacket, ok := message.(readWriteModel.TPKTPacketExactly)
diff --git a/plc4go/spi/RequestTransactionManager.go b/plc4go/spi/RequestTransactionManager.go
index de9f461aa2..cf14625ad6 100644
--- a/plc4go/spi/RequestTransactionManager.go
+++ b/plc4go/spi/RequestTransactionManager.go
@@ -42,6 +42,8 @@ func init() {
 	sharedExecutorInstance.Start()
 }
 
+type RequestTransactionRunnable func(transaction RequestTransaction)
+
 // RequestTransaction represents a transaction
 type RequestTransaction interface {
 	fmt.Stringer
@@ -49,8 +51,8 @@ type RequestTransaction interface {
 	FailRequest(err error) error
 	// EndRequest signals that this transaction is done
 	EndRequest() error
-	// Submit submits a Runnable to the RequestTransactionManager
-	Submit(operation utils.Runnable)
+	// Submit submits a RequestTransactionRunnable to the RequestTransactionManager
+	Submit(operation RequestTransactionRunnable)
 	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
 	AwaitCompletion(ctx context.Context) error
 }
@@ -224,14 +226,14 @@ func (t *requestTransaction) EndRequest() error {
 	return t.parent.endRequest(t)
 }
 
-func (t *requestTransaction) Submit(operation utils.Runnable) {
+func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
 	if t.operation != nil {
 		log.Warn().Msg("Operation already set")
 	}
 	t.transactionLog.Trace().Msgf("Submission of transaction %d", t.transactionId)
 	t.operation = func() {
 		t.transactionLog.Trace().Msgf("Start execution of transaction %d", t.transactionId)
-		operation()
+		operation(t)
 		t.transactionLog.Trace().Msgf("Completed execution of transaction %d", t.transactionId)
 	}
 	t.parent.submitTransaction(t)
diff --git a/plc4go/spi/RequestTransactionManager_test.go b/plc4go/spi/RequestTransactionManager_test.go
index e746b7569a..0a590bef89 100644
--- a/plc4go/spi/RequestTransactionManager_test.go
+++ b/plc4go/spi/RequestTransactionManager_test.go
@@ -622,7 +622,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 		transactionLog   zerolog.Logger
 	}
 	type args struct {
-		operation utils.Runnable
+		operation RequestTransactionRunnable
 	}
 	tests := []struct {
 		name   string
@@ -635,7 +635,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				parent: &requestTransactionManager{},
 			},
 			args: args{
-				operation: func() {
+				operation: func(_ RequestTransaction) {
 					// NOOP
 				},
 			},
@@ -649,7 +649,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				},
 			},
 			args: args{
-				operation: func() {
+				operation: func(_ RequestTransaction) {
 					// NOOP
 				},
 			},