You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 14:03:52 UTC

[plc4x] branch develop updated (2ff14f33c0 -> b9c89ebea3)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 2ff14f33c0 fix(plc4go): pass loggers where missing
     new 25480b1d22 fix(plc4go): transaction should now be properly handled
     new b9c89ebea3 fix(plc4go/spi): harden request transaction manager implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/bacnetip/Connection.go             |   2 +-
 plc4go/internal/bacnetip/Reader.go                 |  12 +-
 plc4go/internal/cbus/Reader.go                     |   4 -
 plc4go/internal/cbus/Writer.go                     |   4 +-
 .../internal/cbus/mock_RequestTransaction_test.go  |  41 ++++
 plc4go/internal/eip/Reader.go                      |   4 +-
 plc4go/internal/eip/Writer.go                      |   8 +-
 plc4go/internal/s7/Reader.go                       |   4 +-
 plc4go/internal/s7/Writer.go                       |   4 +-
 .../spi/transactions/RequestTransactionManager.go  |  63 ++++-
 .../transactions/RequestTransactionManager_test.go | 253 ++++++++++++++++++---
 .../{pool => transactions}/mock_Executor_test.go   |  21 +-
 .../transactions/mock_RequestTransaction_test.go   |  41 ++++
 plc4go/spi/transactions/mock_requirements.go       |   5 +
 14 files changed, 404 insertions(+), 62 deletions(-)
 copy plc4go/spi/{pool => transactions}/mock_Executor_test.go (90%)


[plc4x] 01/02: fix(plc4go): transaction should now be properly handled

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 25480b1d22a08f863ba15383d364e2b29605e35c
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 15:17:00 2023 +0200

    fix(plc4go): transaction should now be properly handled
---
 plc4go/internal/bacnetip/Connection.go |  2 +-
 plc4go/internal/bacnetip/Reader.go     | 12 ++++++++++--
 plc4go/internal/cbus/Reader.go         |  4 ----
 plc4go/internal/cbus/Writer.go         |  4 +++-
 plc4go/internal/eip/Reader.go          |  4 +++-
 plc4go/internal/eip/Writer.go          |  8 ++++++--
 plc4go/internal/s7/Reader.go           |  4 +++-
 plc4go/internal/s7/Writer.go           |  4 +++-
 8 files changed, 29 insertions(+), 13 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 516c4b1fe5..5c19e558d9 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -127,7 +127,7 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 }
 
 func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
-	return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm))
+	return spiModel.NewDefaultPlcReadRequestBuilder(c.GetPlcTagHandler(), NewReader(&c.invokeIdGenerator, c.messageCodec, c.tm, options.WithCustomLogger(c.log)))
 }
 
 func (c *Connection) SubscriptionRequestBuilder() apiModel.PlcSubscriptionRequestBuilder {
diff --git a/plc4go/internal/bacnetip/Reader.go b/plc4go/internal/bacnetip/Reader.go
index e8e1f5b1cc..1ee5239bc9 100644
--- a/plc4go/internal/bacnetip/Reader.go
+++ b/plc4go/internal/bacnetip/Reader.go
@@ -22,7 +22,9 @@ package bacnetip
 import (
 	"context"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
+	"github.com/rs/zerolog"
 	"time"
 
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
@@ -42,9 +44,11 @@ type Reader struct {
 
 	maxSegmentsAccepted   readWriteModel.MaxSegmentsAccepted
 	maxApduLengthAccepted readWriteModel.MaxApduLengthAccepted
+
+	log zerolog.Logger
 }
 
-func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager) *Reader {
+func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCodec, tm transactions.RequestTransactionManager, _options ...options.WithOption) *Reader {
 	return &Reader{
 		invokeIdGenerator: invokeIdGenerator,
 		messageCodec:      messageCodec,
@@ -52,6 +56,8 @@ func NewReader(invokeIdGenerator *InvokeIdGenerator, messageCodec spi.MessageCod
 
 		maxSegmentsAccepted:   readWriteModel.MaxSegmentsAccepted_MORE_THAN_64_SEGMENTS,
 		maxApduLengthAccepted: readWriteModel.MaxApduLengthAccepted_NUM_OCTETS_1476,
+
+		log: options.ExtractCustomLogger(_options...),
 	}
 }
 
@@ -190,7 +196,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 					nil,
 					errors.Wrap(err, "error sending message"),
 				)
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()
diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 2382a20e1b..e57f344224 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -156,10 +156,6 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 		expectedAlpha := messageToSend.(readWriteModel.CBusMessageToServer).GetRequest().(interface{ GetAlpha() readWriteModel.Alpha }).GetAlpha().GetCharacter()
 		return actualAlpha == expectedAlpha
 	}, func(receivedMessage spi.Message) error {
-		defer func(transaction transactions.RequestTransaction) {
-			// This is just to make sure we don't forget to close the transaction here
-			_ = transaction.EndRequest()
-		}(transaction)
 		// Convert the response into an
 		m.log.Trace().Msg("convert response to ")
 		messageToClient := receivedMessage.(readWriteModel.CBusMessageToClient)
diff --git a/plc4go/internal/cbus/Writer.go b/plc4go/internal/cbus/Writer.go
index 35b13f71dc..8dbaf855fe 100644
--- a/plc4go/internal/cbus/Writer.go
+++ b/plc4go/internal/cbus/Writer.go
@@ -144,7 +144,9 @@ func (m *Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteReques
 				}, time.Second*1); err != nil {
 					m.log.Debug().Err(err).Msgf("Error sending message for tag %s", tagNameCopy)
 					addResponseCode(tagNameCopy, apiModel.PlcResponseCode_INTERNAL_ERROR)
-					_ = transaction.EndRequest()
+					if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+						m.log.Debug().Err(err).Msg("Error failing request")
+					}
 				}
 			})
 		}
diff --git a/plc4go/internal/eip/Reader.go b/plc4go/internal/eip/Reader.go
index 61b5503c6f..a38a0d2ce0 100644
--- a/plc4go/internal/eip/Reader.go
+++ b/plc4go/internal/eip/Reader.go
@@ -141,7 +141,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 						nil,
 						errors.Wrap(err, "error sending message"),
 					)
-					_ = transaction.EndRequest()
+					if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+						m.log.Debug().Err(err).Msg("Error failing request")
+					}
 				}
 			})
 		}
diff --git a/plc4go/internal/eip/Writer.go b/plc4go/internal/eip/Writer.go
index 52cef6d3f3..b04eb3c61d 100644
--- a/plc4go/internal/eip/Writer.go
+++ b/plc4go/internal/eip/Writer.go
@@ -172,7 +172,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 							return transaction.EndRequest()
 						}, time.Second*1); err != nil {
 							result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      errors.Wrap(err, "error sending message"))
-							_ = transaction.EndRequest()
+							if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+								m.log.Debug().Err(err).Msg("Error failing request")
+							}
 						}
 					})
 				} else {
@@ -263,7 +265,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 							return transaction.EndRequest()
 						}, time.Second*1); err != nil {
 							result <- spiModel.NewDefaultPlcWriteRequestResult( writeRequest, nil,      errors.Wrap(err, "error sending message"))
-							_ = transaction.EndRequest()
+							if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+								m.log.Debug().Err(err).Msg("Error failing request")
+							}
 						}
 					})
 				}*/
diff --git a/plc4go/internal/s7/Reader.go b/plc4go/internal/s7/Reader.go
index 7b34d87748..59ea75bc28 100644
--- a/plc4go/internal/s7/Reader.go
+++ b/plc4go/internal/s7/Reader.go
@@ -161,7 +161,9 @@ func (m *Reader) Read(ctx context.Context, readRequest apiModel.PlcReadRequest)
 					nil,
 					errors.Wrap(err, "error sending message"),
 				)
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()
diff --git a/plc4go/internal/s7/Writer.go b/plc4go/internal/s7/Writer.go
index bb67795494..22761cd09a 100644
--- a/plc4go/internal/s7/Writer.go
+++ b/plc4go/internal/s7/Writer.go
@@ -149,7 +149,9 @@ func (m Writer) Write(ctx context.Context, writeRequest apiModel.PlcWriteRequest
 				return transaction.EndRequest()
 			}, time.Second*1); err != nil {
 				result <- spiModel.NewDefaultPlcWriteRequestResult(writeRequest, nil, errors.Wrap(err, "error sending message"))
-				_ = transaction.EndRequest()
+				if err := transaction.FailRequest(errors.Errorf("timeout after %s", time.Second*1)); err != nil {
+					m.log.Debug().Err(err).Msg("Error failing request")
+				}
 			}
 		})
 	}()


[plc4x] 02/02: fix(plc4go/spi): harden request transaction manager implementation

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit b9c89ebea3d57a1153919e05cb947d61c12f282e
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 16:03:43 2023 +0200

    fix(plc4go/spi): harden request transaction manager implementation
---
 .../internal/cbus/mock_RequestTransaction_test.go  |  41 ++++
 .../spi/transactions/RequestTransactionManager.go  |  63 ++++-
 .../transactions/RequestTransactionManager_test.go | 253 ++++++++++++++++++---
 plc4go/spi/transactions/mock_Executor_test.go      | 249 ++++++++++++++++++++
 .../transactions/mock_RequestTransaction_test.go   |  41 ++++
 plc4go/spi/transactions/mock_requirements.go       |   5 +
 6 files changed, 613 insertions(+), 39 deletions(-)

diff --git a/plc4go/internal/cbus/mock_RequestTransaction_test.go b/plc4go/internal/cbus/mock_RequestTransaction_test.go
index ae6861665b..6e81fdd880 100644
--- a/plc4go/internal/cbus/mock_RequestTransaction_test.go
+++ b/plc4go/internal/cbus/mock_RequestTransaction_test.go
@@ -166,6 +166,47 @@ func (_c *MockRequestTransaction_FailRequest_Call) RunAndReturn(run func(error)
 	return _c
 }
 
+// IsCompleted provides a mock function with given fields:
+func (_m *MockRequestTransaction) IsCompleted() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockRequestTransaction_IsCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCompleted'
+type MockRequestTransaction_IsCompleted_Call struct {
+	*mock.Call
+}
+
+// IsCompleted is a helper method to define mock.On call
+func (_e *MockRequestTransaction_Expecter) IsCompleted() *MockRequestTransaction_IsCompleted_Call {
+	return &MockRequestTransaction_IsCompleted_Call{Call: _e.mock.On("IsCompleted")}
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Run(run func()) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Return(_a0 bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) RunAndReturn(run func() bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // String provides a mock function with given fields:
 func (_m *MockRequestTransaction) String() string {
 	ret := _m.Called()
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index edd287c6fa..c06e0de71e 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -35,7 +35,6 @@ import (
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
-	"github.com/rs/zerolog/log"
 )
 
 var sharedExecutorInstance pool.Executor // shared instance
@@ -58,6 +57,8 @@ type RequestTransaction interface {
 	Submit(operation RequestTransactionRunnable)
 	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
 	AwaitCompletion(ctx context.Context) error
+	// IsCompleted indicates that the that this RequestTransaction is completed
+	IsCompleted() bool
 }
 
 // RequestTransactionManager handles transactions
@@ -79,6 +80,8 @@ func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...op
 		workLog:                    *list.New(),
 		executor:                   sharedExecutorInstance,
 
+		traceTransactionManagerTransactions: config.TraceTransactionManagerTransactions,
+
 		log: options.ExtractCustomLogger(_options...),
 	}
 	for _, option := range _options {
@@ -114,6 +117,9 @@ type requestTransaction struct {
 	operation        pool.Runnable
 	completionFuture pool.CompletionFuture
 
+	stateChangeMutex sync.Mutex
+	completed        bool
+
 	transactionLog zerolog.Logger
 }
 
@@ -129,8 +135,12 @@ type requestTransactionManager struct {
 	workLogMutex sync.RWMutex
 	executor     pool.Executor
 
+	// Indicates it this rtm is in shutdown
 	shutdown bool
 
+	// flag set to true if it should trace transactions
+	traceTransactionManagerTransactions bool
+
 	log zerolog.Logger
 }
 
@@ -179,26 +189,35 @@ func (r *requestTransactionManager) processWorklog() {
 	}
 }
 
+type completedFuture struct {
+	err error
+}
+
+func (c completedFuture) AwaitCompletion(_ context.Context) error {
+	return c.err
+}
+
+func (completedFuture) Cancel(_ bool, _ error) {
+	// No op
+}
+
 func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 	r.transactionMutex.Lock()
 	defer r.transactionMutex.Unlock()
 	currentTransactionId := r.transactionId
 	r.transactionId += 1
-	transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
-	if !config.TraceTransactionManagerTransactions {
+	transactionLogger := r.log.With().Int32("transactionId", currentTransactionId).Logger()
+	if !r.traceTransactionManagerTransactions {
 		transactionLogger = zerolog.Nop()
 	}
 	transaction := &requestTransaction{
-		r,
-		currentTransactionId,
-		nil,
-		nil,
-		transactionLogger,
+		parent:         r,
+		transactionId:  currentTransactionId,
+		transactionLog: transactionLogger,
 	}
 	if r.shutdown {
-		if err := r.failRequest(transaction, errors.New("request transaction manager in shutdown")); err != nil {
-			r.log.Error().Err(err).Msg("error shutting down transaction")
-		}
+		transaction.completed = true
+		transaction.completionFuture = completedFuture{errors.New("request transaction manager in shutdown")}
 	}
 	return transaction
 }
@@ -271,17 +290,35 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 }
 
 func (t *requestTransaction) FailRequest(err error) error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.Wrap(err, "calling fail on a already completed transaction")
+	}
 	t.transactionLog.Trace().Msg("Fail the request")
+	t.completed = true
 	return t.parent.failRequest(t, err)
 }
 
 func (t *requestTransaction) EndRequest() error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.New("calling end on a already completed transaction")
+	}
 	t.transactionLog.Trace().Msg("Ending the request")
+	t.completed = true
 	// Remove it from Running Requests
 	return t.parent.endRequest(t)
 }
 
 func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
+		return
+	}
 	if t.operation != nil {
 		t.transactionLog.Warn().Msg("Operation already set")
 	}
@@ -315,6 +352,10 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
 	return nil
 }
 
+func (t *requestTransaction) IsCompleted() bool {
+	return t.completed
+}
+
 func (t *requestTransaction) String() string {
 	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
 }
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index ae71055f07..7384f87871 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -22,8 +22,11 @@ package transactions
 import (
 	"container/list"
 	"context"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
+	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
@@ -145,38 +148,63 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
 
 func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 	type fields struct {
-		runningRequests            []*requestTransaction
-		numberOfConcurrentRequests int
-		transactionId              int32
-		workLog                    list.List
-		executor                   pool.Executor
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		want   RequestTransaction
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
 	}{
 		{
 			name: "start one",
-			want: &requestTransaction{
-				parent: &requestTransactionManager{
-					transactionId: 1,
-				},
-				transactionLog: zerolog.Nop(),
+			setup: func(t *testing.T, fields *fields) {
+				fields.log = testutils.ProduceTestingLogger(t)
+			},
+			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
+				assert.False(t, requestTransaction.IsCompleted())
+				return true
+			},
+		},
+		{
+			name: "start one in shutdown",
+			fields: fields{
+				shutdown: true,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.log = testutils.ProduceTestingLogger(t)
+			},
+			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
+				assert.True(t, requestTransaction.IsCompleted())
+				assert.Error(t, requestTransaction.AwaitCompletion(context.Background()))
+				return true
 			},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
 			r := &requestTransactionManager{
-				runningRequests:            tt.fields.runningRequests,
-				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
-				workLog:                    tt.fields.workLog,
-				executor:                   tt.fields.executor,
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
 			}
-			if got := r.StartTransaction(); !assert.Equal(t, tt.want, got) {
-				t.Errorf("StartTransaction() = %v, want %v", got, tt.want)
+			if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
+				t.Errorf("StartTransaction() = %v", got)
 			}
 		})
 	}
@@ -241,6 +269,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 		transactionId              int32
 		workLog                    list.List
 		executor                   pool.Executor
+		log                        zerolog.Logger
 	}
 	type args struct {
 		transaction *requestTransaction
@@ -259,6 +288,8 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transaction: &requestTransaction{},
 			},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				fields.log = testutils.ProduceTestingLogger(t)
+
 				completionFuture := NewMockCompletionFuture(t)
 				expect := completionFuture.EXPECT()
 				expect.Cancel(true, nil).Return()
@@ -278,6 +309,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transactionId:              tt.fields.transactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
+				log:                        tt.fields.log,
 			}
 			if err := r.failRequest(tt.args.transaction, tt.args.err); (err != nil) != tt.wantErr {
 				t.Errorf("failRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -497,6 +529,7 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	tests := []struct {
 		name    string
@@ -510,6 +543,14 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 			},
 			wantErr: true,
 		},
+		{
+			name: "end it completed",
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: true,
+		},
 	}
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
@@ -519,6 +560,7 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
 			if err := t.EndRequest(); (err != nil) != tt.wantErr {
 				t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -534,6 +576,7 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	type args struct {
 		err error
@@ -543,7 +586,7 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 		fields    fields
 		args      args
 		mockSetup func(t *testing.T, fields *fields, args *args)
-		wantErr   bool
+		wantErr   assert.ErrorAssertionFunc
 	}{
 		{
 			name: "just fail it",
@@ -556,24 +599,34 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 				expect.Cancel(true, nil).Return()
 				fields.completionFuture = completionFuture
 			},
-			wantErr: true,
+			wantErr: assert.Error,
+		},
+		{
+			name: "just fail it (completed)",
+			args: args{
+				err: errors.New("nope"),
+			},
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: assert.Error,
 		},
 	}
 	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
+		t1.Run(tt.name, func(t *testing.T) {
 			if tt.mockSetup != nil {
-				tt.mockSetup(t1, &tt.fields, &tt.args)
+				tt.mockSetup(t, &tt.fields, &tt.args)
 			}
-			t := &requestTransaction{
+			r := &requestTransaction{
 				parent:           tt.fields.parent,
 				transactionId:    tt.fields.transactionId,
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
-			if err := t.FailRequest(tt.args.err); (err != nil) != tt.wantErr {
-				t1.Errorf("FailRequest() error = %v, wantErr %v", err, tt.wantErr)
-			}
+			tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
 		})
 	}
 }
@@ -619,6 +672,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	type args struct {
 		operation RequestTransactionRunnable
@@ -653,6 +707,21 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				},
 			},
 		},
+		{
+			name: "submit completed",
+			fields: fields{
+				parent: &requestTransactionManager{},
+				operation: func() {
+					// NOOP
+				},
+				completed: true,
+			},
+			args: args{
+				operation: func(_ RequestTransaction) {
+					// NOOP
+				},
+			},
+		},
 	}
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
@@ -662,9 +731,137 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
 			t.Submit(tt.args.operation)
 			t.operation()
 		})
 	}
 }
+
+func Test_requestTransactionManager_Close(t *testing.T) {
+	type fields struct {
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		setup   func(t *testing.T, fields *fields)
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "close it",
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			r := &requestTransactionManager{
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
+			}
+			tt.wantErr(t, r.Close(), fmt.Sprintf("Close()"))
+		})
+	}
+}
+
+func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
+	type fields struct {
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
+	}
+	type args struct {
+		timeout time.Duration
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		setup   func(t *testing.T, fields *fields)
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "close it",
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "close it with timeout",
+			args: args{
+				timeout: 20 * time.Millisecond,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "close it with timeout fires",
+			fields: fields{
+				runningRequests: []*requestTransaction{
+					{},
+				},
+			},
+			args: args{
+				timeout: 20 * time.Millisecond,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			r := &requestTransactionManager{
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
+			}
+			tt.wantErr(t, r.CloseGraceful(tt.args.timeout), fmt.Sprintf("CloseGraceful(%v)", tt.args.timeout))
+		})
+	}
+}
diff --git a/plc4go/spi/transactions/mock_Executor_test.go b/plc4go/spi/transactions/mock_Executor_test.go
new file mode 100644
index 0000000000..526abda3d4
--- /dev/null
+++ b/plc4go/spi/transactions/mock_Executor_test.go
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.28.1. DO NOT EDIT.
+
+package transactions
+
+import (
+	context "context"
+
+	pool "github.com/apache/plc4x/plc4go/spi/pool"
+	mock "github.com/stretchr/testify/mock"
+)
+
+// MockExecutor is an autogenerated mock type for the Executor type
+type MockExecutor struct {
+	mock.Mock
+}
+
+type MockExecutor_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockExecutor) EXPECT() *MockExecutor_Expecter {
+	return &MockExecutor_Expecter{mock: &_m.Mock}
+}
+
+// Close provides a mock function with given fields:
+func (_m *MockExecutor) Close() error {
+	ret := _m.Called()
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func() error); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// MockExecutor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
+type MockExecutor_Close_Call struct {
+	*mock.Call
+}
+
+// Close is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Close() *MockExecutor_Close_Call {
+	return &MockExecutor_Close_Call{Call: _e.mock.On("Close")}
+}
+
+func (_c *MockExecutor_Close_Call) Run(run func()) *MockExecutor_Close_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Close_Call) Return(_a0 error) *MockExecutor_Close_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_Close_Call) RunAndReturn(run func() error) *MockExecutor_Close_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// IsRunning provides a mock function with given fields:
+func (_m *MockExecutor) IsRunning() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockExecutor_IsRunning_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsRunning'
+type MockExecutor_IsRunning_Call struct {
+	*mock.Call
+}
+
+// IsRunning is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) IsRunning() *MockExecutor_IsRunning_Call {
+	return &MockExecutor_IsRunning_Call{Call: _e.mock.On("IsRunning")}
+}
+
+func (_c *MockExecutor_IsRunning_Call) Run(run func()) *MockExecutor_IsRunning_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) Return(_a0 bool) *MockExecutor_IsRunning_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) RunAndReturn(run func() bool) *MockExecutor_IsRunning_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Start provides a mock function with given fields:
+func (_m *MockExecutor) Start() {
+	_m.Called()
+}
+
+// MockExecutor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
+type MockExecutor_Start_Call struct {
+	*mock.Call
+}
+
+// Start is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Start() *MockExecutor_Start_Call {
+	return &MockExecutor_Start_Call{Call: _e.mock.On("Start")}
+}
+
+func (_c *MockExecutor_Start_Call) Run(run func()) *MockExecutor_Start_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Start_Call) Return() *MockExecutor_Start_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockExecutor_Start_Call) RunAndReturn(run func()) *MockExecutor_Start_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Stop provides a mock function with given fields:
+func (_m *MockExecutor) Stop() {
+	_m.Called()
+}
+
+// MockExecutor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
+type MockExecutor_Stop_Call struct {
+	*mock.Call
+}
+
+// Stop is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Stop() *MockExecutor_Stop_Call {
+	return &MockExecutor_Stop_Call{Call: _e.mock.On("Stop")}
+}
+
+func (_c *MockExecutor_Stop_Call) Run(run func()) *MockExecutor_Stop_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Stop_Call) Return() *MockExecutor_Stop_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockExecutor_Stop_Call) RunAndReturn(run func()) *MockExecutor_Stop_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Submit provides a mock function with given fields: ctx, workItemId, runnable
+func (_m *MockExecutor) Submit(ctx context.Context, workItemId int32, runnable pool.Runnable) pool.CompletionFuture {
+	ret := _m.Called(ctx, workItemId, runnable)
+
+	var r0 pool.CompletionFuture
+	if rf, ok := ret.Get(0).(func(context.Context, int32, pool.Runnable) pool.CompletionFuture); ok {
+		r0 = rf(ctx, workItemId, runnable)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(pool.CompletionFuture)
+		}
+	}
+
+	return r0
+}
+
+// MockExecutor_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
+type MockExecutor_Submit_Call struct {
+	*mock.Call
+}
+
+// Submit is a helper method to define mock.On call
+//   - ctx context.Context
+//   - workItemId int32
+//   - runnable pool.Runnable
+func (_e *MockExecutor_Expecter) Submit(ctx interface{}, workItemId interface{}, runnable interface{}) *MockExecutor_Submit_Call {
+	return &MockExecutor_Submit_Call{Call: _e.mock.On("Submit", ctx, workItemId, runnable)}
+}
+
+func (_c *MockExecutor_Submit_Call) Run(run func(ctx context.Context, workItemId int32, runnable pool.Runnable)) *MockExecutor_Submit_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(int32), args[2].(pool.Runnable))
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Submit_Call) Return(_a0 pool.CompletionFuture) *MockExecutor_Submit_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_Submit_Call) RunAndReturn(run func(context.Context, int32, pool.Runnable) pool.CompletionFuture) *MockExecutor_Submit_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+type mockConstructorTestingTNewMockExecutor interface {
+	mock.TestingT
+	Cleanup(func())
+}
+
+// NewMockExecutor creates a new instance of MockExecutor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewMockExecutor(t mockConstructorTestingTNewMockExecutor) *MockExecutor {
+	mock := &MockExecutor{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/plc4go/spi/transactions/mock_RequestTransaction_test.go b/plc4go/spi/transactions/mock_RequestTransaction_test.go
index 2c8e24ec20..a177a118a3 100644
--- a/plc4go/spi/transactions/mock_RequestTransaction_test.go
+++ b/plc4go/spi/transactions/mock_RequestTransaction_test.go
@@ -165,6 +165,47 @@ func (_c *MockRequestTransaction_FailRequest_Call) RunAndReturn(run func(error)
 	return _c
 }
 
+// IsCompleted provides a mock function with given fields:
+func (_m *MockRequestTransaction) IsCompleted() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockRequestTransaction_IsCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCompleted'
+type MockRequestTransaction_IsCompleted_Call struct {
+	*mock.Call
+}
+
+// IsCompleted is a helper method to define mock.On call
+func (_e *MockRequestTransaction_Expecter) IsCompleted() *MockRequestTransaction_IsCompleted_Call {
+	return &MockRequestTransaction_IsCompleted_Call{Call: _e.mock.On("IsCompleted")}
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Run(run func()) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Return(_a0 bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) RunAndReturn(run func() bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // String provides a mock function with given fields:
 func (_m *MockRequestTransaction) String() string {
 	ret := _m.Called()
diff --git a/plc4go/spi/transactions/mock_requirements.go b/plc4go/spi/transactions/mock_requirements.go
index 64f8171934..86b5f351e3 100644
--- a/plc4go/spi/transactions/mock_requirements.go
+++ b/plc4go/spi/transactions/mock_requirements.go
@@ -29,3 +29,8 @@ import (
 type CompletionFuture interface {
 	pool.CompletionFuture
 }
+
+// Deprecated: don't use it in productive code
+type Executor interface {
+	pool.Executor
+}