You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 14:03:54 UTC

[plc4x] 02/02: fix(plc4go/spi): harden request transaction manager implementation

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit b9c89ebea3d57a1153919e05cb947d61c12f282e
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 16:03:43 2023 +0200

    fix(plc4go/spi): harden request transaction manager implementation
---
 .../internal/cbus/mock_RequestTransaction_test.go  |  41 ++++
 .../spi/transactions/RequestTransactionManager.go  |  63 ++++-
 .../transactions/RequestTransactionManager_test.go | 253 ++++++++++++++++++---
 plc4go/spi/transactions/mock_Executor_test.go      | 249 ++++++++++++++++++++
 .../transactions/mock_RequestTransaction_test.go   |  41 ++++
 plc4go/spi/transactions/mock_requirements.go       |   5 +
 6 files changed, 613 insertions(+), 39 deletions(-)

diff --git a/plc4go/internal/cbus/mock_RequestTransaction_test.go b/plc4go/internal/cbus/mock_RequestTransaction_test.go
index ae6861665b..6e81fdd880 100644
--- a/plc4go/internal/cbus/mock_RequestTransaction_test.go
+++ b/plc4go/internal/cbus/mock_RequestTransaction_test.go
@@ -166,6 +166,47 @@ func (_c *MockRequestTransaction_FailRequest_Call) RunAndReturn(run func(error)
 	return _c
 }
 
+// IsCompleted provides a mock function with given fields:
+func (_m *MockRequestTransaction) IsCompleted() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockRequestTransaction_IsCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCompleted'
+type MockRequestTransaction_IsCompleted_Call struct {
+	*mock.Call
+}
+
+// IsCompleted is a helper method to define mock.On call
+func (_e *MockRequestTransaction_Expecter) IsCompleted() *MockRequestTransaction_IsCompleted_Call {
+	return &MockRequestTransaction_IsCompleted_Call{Call: _e.mock.On("IsCompleted")}
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Run(run func()) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Return(_a0 bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) RunAndReturn(run func() bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // String provides a mock function with given fields:
 func (_m *MockRequestTransaction) String() string {
 	ret := _m.Called()
diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index edd287c6fa..c06e0de71e 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -35,7 +35,6 @@ import (
 
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
-	"github.com/rs/zerolog/log"
 )
 
 var sharedExecutorInstance pool.Executor // shared instance
@@ -58,6 +57,8 @@ type RequestTransaction interface {
 	Submit(operation RequestTransactionRunnable)
 	// AwaitCompletion wait for this RequestTransaction to finish. Returns an error if it finished unsuccessful
 	AwaitCompletion(ctx context.Context) error
+	// IsCompleted indicates that the that this RequestTransaction is completed
+	IsCompleted() bool
 }
 
 // RequestTransactionManager handles transactions
@@ -79,6 +80,8 @@ func NewRequestTransactionManager(numberOfConcurrentRequests int, _options ...op
 		workLog:                    *list.New(),
 		executor:                   sharedExecutorInstance,
 
+		traceTransactionManagerTransactions: config.TraceTransactionManagerTransactions,
+
 		log: options.ExtractCustomLogger(_options...),
 	}
 	for _, option := range _options {
@@ -114,6 +117,9 @@ type requestTransaction struct {
 	operation        pool.Runnable
 	completionFuture pool.CompletionFuture
 
+	stateChangeMutex sync.Mutex
+	completed        bool
+
 	transactionLog zerolog.Logger
 }
 
@@ -129,8 +135,12 @@ type requestTransactionManager struct {
 	workLogMutex sync.RWMutex
 	executor     pool.Executor
 
+	// Indicates it this rtm is in shutdown
 	shutdown bool
 
+	// flag set to true if it should trace transactions
+	traceTransactionManagerTransactions bool
+
 	log zerolog.Logger
 }
 
@@ -179,26 +189,35 @@ func (r *requestTransactionManager) processWorklog() {
 	}
 }
 
+type completedFuture struct {
+	err error
+}
+
+func (c completedFuture) AwaitCompletion(_ context.Context) error {
+	return c.err
+}
+
+func (completedFuture) Cancel(_ bool, _ error) {
+	// No op
+}
+
 func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 	r.transactionMutex.Lock()
 	defer r.transactionMutex.Unlock()
 	currentTransactionId := r.transactionId
 	r.transactionId += 1
-	transactionLogger := log.With().Int32("transactionId", currentTransactionId).Logger()
-	if !config.TraceTransactionManagerTransactions {
+	transactionLogger := r.log.With().Int32("transactionId", currentTransactionId).Logger()
+	if !r.traceTransactionManagerTransactions {
 		transactionLogger = zerolog.Nop()
 	}
 	transaction := &requestTransaction{
-		r,
-		currentTransactionId,
-		nil,
-		nil,
-		transactionLogger,
+		parent:         r,
+		transactionId:  currentTransactionId,
+		transactionLog: transactionLogger,
 	}
 	if r.shutdown {
-		if err := r.failRequest(transaction, errors.New("request transaction manager in shutdown")); err != nil {
-			r.log.Error().Err(err).Msg("error shutting down transaction")
-		}
+		transaction.completed = true
+		transaction.completionFuture = completedFuture{errors.New("request transaction manager in shutdown")}
 	}
 	return transaction
 }
@@ -271,17 +290,35 @@ func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 }
 
 func (t *requestTransaction) FailRequest(err error) error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.Wrap(err, "calling fail on a already completed transaction")
+	}
 	t.transactionLog.Trace().Msg("Fail the request")
+	t.completed = true
 	return t.parent.failRequest(t, err)
 }
 
 func (t *requestTransaction) EndRequest() error {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		return errors.New("calling end on a already completed transaction")
+	}
 	t.transactionLog.Trace().Msg("Ending the request")
+	t.completed = true
 	// Remove it from Running Requests
 	return t.parent.endRequest(t)
 }
 
 func (t *requestTransaction) Submit(operation RequestTransactionRunnable) {
+	t.stateChangeMutex.Lock()
+	defer t.stateChangeMutex.Unlock()
+	if t.completed {
+		t.transactionLog.Warn().Msg("calling submit on a already completed transaction")
+		return
+	}
 	if t.operation != nil {
 		t.transactionLog.Warn().Msg("Operation already set")
 	}
@@ -315,6 +352,10 @@ func (t *requestTransaction) AwaitCompletion(ctx context.Context) error {
 	return nil
 }
 
+func (t *requestTransaction) IsCompleted() bool {
+	return t.completed
+}
+
 func (t *requestTransaction) String() string {
 	return fmt.Sprintf("Transaction{tid:%d}", t.transactionId)
 }
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index ae71055f07..7384f87871 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -22,8 +22,11 @@ package transactions
 import (
 	"container/list"
 	"context"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
+	"github.com/apache/plc4x/plc4go/spi/testutils"
+	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
@@ -145,38 +148,63 @@ func Test_requestTransactionManager_SetNumberOfConcurrentRequests(t *testing.T)
 
 func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 	type fields struct {
-		runningRequests            []*requestTransaction
-		numberOfConcurrentRequests int
-		transactionId              int32
-		workLog                    list.List
-		executor                   pool.Executor
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		want   RequestTransaction
+		name       string
+		fields     fields
+		setup      func(t *testing.T, fields *fields)
+		wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
 	}{
 		{
 			name: "start one",
-			want: &requestTransaction{
-				parent: &requestTransactionManager{
-					transactionId: 1,
-				},
-				transactionLog: zerolog.Nop(),
+			setup: func(t *testing.T, fields *fields) {
+				fields.log = testutils.ProduceTestingLogger(t)
+			},
+			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
+				assert.False(t, requestTransaction.IsCompleted())
+				return true
+			},
+		},
+		{
+			name: "start one in shutdown",
+			fields: fields{
+				shutdown: true,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				fields.log = testutils.ProduceTestingLogger(t)
+			},
+			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
+				assert.True(t, requestTransaction.IsCompleted())
+				assert.Error(t, requestTransaction.AwaitCompletion(context.Background()))
+				return true
 			},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
 			r := &requestTransactionManager{
-				runningRequests:            tt.fields.runningRequests,
-				numberOfConcurrentRequests: tt.fields.numberOfConcurrentRequests,
-				transactionId:              tt.fields.transactionId,
-				workLog:                    tt.fields.workLog,
-				executor:                   tt.fields.executor,
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
 			}
-			if got := r.StartTransaction(); !assert.Equal(t, tt.want, got) {
-				t.Errorf("StartTransaction() = %v, want %v", got, tt.want)
+			if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
+				t.Errorf("StartTransaction() = %v", got)
 			}
 		})
 	}
@@ -241,6 +269,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 		transactionId              int32
 		workLog                    list.List
 		executor                   pool.Executor
+		log                        zerolog.Logger
 	}
 	type args struct {
 		transaction *requestTransaction
@@ -259,6 +288,8 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transaction: &requestTransaction{},
 			},
 			mockSetup: func(t *testing.T, fields *fields, args *args) {
+				fields.log = testutils.ProduceTestingLogger(t)
+
 				completionFuture := NewMockCompletionFuture(t)
 				expect := completionFuture.EXPECT()
 				expect.Cancel(true, nil).Return()
@@ -278,6 +309,7 @@ func Test_requestTransactionManager_failRequest(t *testing.T) {
 				transactionId:              tt.fields.transactionId,
 				workLog:                    tt.fields.workLog,
 				executor:                   tt.fields.executor,
+				log:                        tt.fields.log,
 			}
 			if err := r.failRequest(tt.args.transaction, tt.args.err); (err != nil) != tt.wantErr {
 				t.Errorf("failRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -497,6 +529,7 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	tests := []struct {
 		name    string
@@ -510,6 +543,14 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 			},
 			wantErr: true,
 		},
+		{
+			name: "end it completed",
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: true,
+		},
 	}
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
@@ -519,6 +560,7 @@ func Test_requestTransaction_EndRequest(t1 *testing.T) {
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
 			if err := t.EndRequest(); (err != nil) != tt.wantErr {
 				t1.Errorf("EndRequest() error = %v, wantErr %v", err, tt.wantErr)
@@ -534,6 +576,7 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	type args struct {
 		err error
@@ -543,7 +586,7 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 		fields    fields
 		args      args
 		mockSetup func(t *testing.T, fields *fields, args *args)
-		wantErr   bool
+		wantErr   assert.ErrorAssertionFunc
 	}{
 		{
 			name: "just fail it",
@@ -556,24 +599,34 @@ func Test_requestTransaction_FailRequest(t1 *testing.T) {
 				expect.Cancel(true, nil).Return()
 				fields.completionFuture = completionFuture
 			},
-			wantErr: true,
+			wantErr: assert.Error,
+		},
+		{
+			name: "just fail it (completed)",
+			args: args{
+				err: errors.New("nope"),
+			},
+			fields: fields{
+				parent:    &requestTransactionManager{},
+				completed: true,
+			},
+			wantErr: assert.Error,
 		},
 	}
 	for _, tt := range tests {
-		t1.Run(tt.name, func(t1 *testing.T) {
+		t1.Run(tt.name, func(t *testing.T) {
 			if tt.mockSetup != nil {
-				tt.mockSetup(t1, &tt.fields, &tt.args)
+				tt.mockSetup(t, &tt.fields, &tt.args)
 			}
-			t := &requestTransaction{
+			r := &requestTransaction{
 				parent:           tt.fields.parent,
 				transactionId:    tt.fields.transactionId,
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
-			if err := t.FailRequest(tt.args.err); (err != nil) != tt.wantErr {
-				t1.Errorf("FailRequest() error = %v, wantErr %v", err, tt.wantErr)
-			}
+			tt.wantErr(t, r.FailRequest(tt.args.err), "FailRequest() error = %v", tt.args.err)
 		})
 	}
 }
@@ -619,6 +672,7 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 		operation        pool.Runnable
 		completionFuture pool.CompletionFuture
 		transactionLog   zerolog.Logger
+		completed        bool
 	}
 	type args struct {
 		operation RequestTransactionRunnable
@@ -653,6 +707,21 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				},
 			},
 		},
+		{
+			name: "submit completed",
+			fields: fields{
+				parent: &requestTransactionManager{},
+				operation: func() {
+					// NOOP
+				},
+				completed: true,
+			},
+			args: args{
+				operation: func(_ RequestTransaction) {
+					// NOOP
+				},
+			},
+		},
 	}
 	for _, tt := range tests {
 		t1.Run(tt.name, func(t1 *testing.T) {
@@ -662,9 +731,137 @@ func Test_requestTransaction_Submit(t1 *testing.T) {
 				operation:        tt.fields.operation,
 				completionFuture: tt.fields.completionFuture,
 				transactionLog:   tt.fields.transactionLog,
+				completed:        tt.fields.completed,
 			}
 			t.Submit(tt.args.operation)
 			t.operation()
 		})
 	}
 }
+
+func Test_requestTransactionManager_Close(t *testing.T) {
+	type fields struct {
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		setup   func(t *testing.T, fields *fields)
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "close it",
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			r := &requestTransactionManager{
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
+			}
+			tt.wantErr(t, r.Close(), fmt.Sprintf("Close()"))
+		})
+	}
+}
+
+func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
+	type fields struct {
+		runningRequests                     []*requestTransaction
+		numberOfConcurrentRequests          int
+		transactionId                       int32
+		workLog                             list.List
+		executor                            pool.Executor
+		shutdown                            bool
+		traceTransactionManagerTransactions bool
+		log                                 zerolog.Logger
+	}
+	type args struct {
+		timeout time.Duration
+	}
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		setup   func(t *testing.T, fields *fields)
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "close it",
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "close it with timeout",
+			args: args{
+				timeout: 20 * time.Millisecond,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+		{
+			name: "close it with timeout fires",
+			fields: fields{
+				runningRequests: []*requestTransaction{
+					{},
+				},
+			},
+			args: args{
+				timeout: 20 * time.Millisecond,
+			},
+			setup: func(t *testing.T, fields *fields) {
+				executor := NewMockExecutor(t)
+				executor.EXPECT().Close().Return(nil)
+				fields.executor = executor
+			},
+			wantErr: assert.NoError,
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			r := &requestTransactionManager{
+				runningRequests:                     tt.fields.runningRequests,
+				numberOfConcurrentRequests:          tt.fields.numberOfConcurrentRequests,
+				transactionId:                       tt.fields.transactionId,
+				workLog:                             tt.fields.workLog,
+				executor:                            tt.fields.executor,
+				shutdown:                            tt.fields.shutdown,
+				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
+				log:                                 tt.fields.log,
+			}
+			tt.wantErr(t, r.CloseGraceful(tt.args.timeout), fmt.Sprintf("CloseGraceful(%v)", tt.args.timeout))
+		})
+	}
+}
diff --git a/plc4go/spi/transactions/mock_Executor_test.go b/plc4go/spi/transactions/mock_Executor_test.go
new file mode 100644
index 0000000000..526abda3d4
--- /dev/null
+++ b/plc4go/spi/transactions/mock_Executor_test.go
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+// Code generated by mockery v2.28.1. DO NOT EDIT.
+
+package transactions
+
+import (
+	context "context"
+
+	pool "github.com/apache/plc4x/plc4go/spi/pool"
+	mock "github.com/stretchr/testify/mock"
+)
+
+// MockExecutor is an autogenerated mock type for the Executor type
+type MockExecutor struct {
+	mock.Mock
+}
+
+type MockExecutor_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockExecutor) EXPECT() *MockExecutor_Expecter {
+	return &MockExecutor_Expecter{mock: &_m.Mock}
+}
+
+// Close provides a mock function with given fields:
+func (_m *MockExecutor) Close() error {
+	ret := _m.Called()
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func() error); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
+
+// MockExecutor_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close'
+type MockExecutor_Close_Call struct {
+	*mock.Call
+}
+
+// Close is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Close() *MockExecutor_Close_Call {
+	return &MockExecutor_Close_Call{Call: _e.mock.On("Close")}
+}
+
+func (_c *MockExecutor_Close_Call) Run(run func()) *MockExecutor_Close_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Close_Call) Return(_a0 error) *MockExecutor_Close_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_Close_Call) RunAndReturn(run func() error) *MockExecutor_Close_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// IsRunning provides a mock function with given fields:
+func (_m *MockExecutor) IsRunning() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockExecutor_IsRunning_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsRunning'
+type MockExecutor_IsRunning_Call struct {
+	*mock.Call
+}
+
+// IsRunning is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) IsRunning() *MockExecutor_IsRunning_Call {
+	return &MockExecutor_IsRunning_Call{Call: _e.mock.On("IsRunning")}
+}
+
+func (_c *MockExecutor_IsRunning_Call) Run(run func()) *MockExecutor_IsRunning_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) Return(_a0 bool) *MockExecutor_IsRunning_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_IsRunning_Call) RunAndReturn(run func() bool) *MockExecutor_IsRunning_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Start provides a mock function with given fields:
+func (_m *MockExecutor) Start() {
+	_m.Called()
+}
+
+// MockExecutor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
+type MockExecutor_Start_Call struct {
+	*mock.Call
+}
+
+// Start is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Start() *MockExecutor_Start_Call {
+	return &MockExecutor_Start_Call{Call: _e.mock.On("Start")}
+}
+
+func (_c *MockExecutor_Start_Call) Run(run func()) *MockExecutor_Start_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Start_Call) Return() *MockExecutor_Start_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockExecutor_Start_Call) RunAndReturn(run func()) *MockExecutor_Start_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Stop provides a mock function with given fields:
+func (_m *MockExecutor) Stop() {
+	_m.Called()
+}
+
+// MockExecutor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
+type MockExecutor_Stop_Call struct {
+	*mock.Call
+}
+
+// Stop is a helper method to define mock.On call
+func (_e *MockExecutor_Expecter) Stop() *MockExecutor_Stop_Call {
+	return &MockExecutor_Stop_Call{Call: _e.mock.On("Stop")}
+}
+
+func (_c *MockExecutor_Stop_Call) Run(run func()) *MockExecutor_Stop_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Stop_Call) Return() *MockExecutor_Stop_Call {
+	_c.Call.Return()
+	return _c
+}
+
+func (_c *MockExecutor_Stop_Call) RunAndReturn(run func()) *MockExecutor_Stop_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// Submit provides a mock function with given fields: ctx, workItemId, runnable
+func (_m *MockExecutor) Submit(ctx context.Context, workItemId int32, runnable pool.Runnable) pool.CompletionFuture {
+	ret := _m.Called(ctx, workItemId, runnable)
+
+	var r0 pool.CompletionFuture
+	if rf, ok := ret.Get(0).(func(context.Context, int32, pool.Runnable) pool.CompletionFuture); ok {
+		r0 = rf(ctx, workItemId, runnable)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(pool.CompletionFuture)
+		}
+	}
+
+	return r0
+}
+
+// MockExecutor_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
+type MockExecutor_Submit_Call struct {
+	*mock.Call
+}
+
+// Submit is a helper method to define mock.On call
+//   - ctx context.Context
+//   - workItemId int32
+//   - runnable pool.Runnable
+func (_e *MockExecutor_Expecter) Submit(ctx interface{}, workItemId interface{}, runnable interface{}) *MockExecutor_Submit_Call {
+	return &MockExecutor_Submit_Call{Call: _e.mock.On("Submit", ctx, workItemId, runnable)}
+}
+
+func (_c *MockExecutor_Submit_Call) Run(run func(ctx context.Context, workItemId int32, runnable pool.Runnable)) *MockExecutor_Submit_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(int32), args[2].(pool.Runnable))
+	})
+	return _c
+}
+
+func (_c *MockExecutor_Submit_Call) Return(_a0 pool.CompletionFuture) *MockExecutor_Submit_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockExecutor_Submit_Call) RunAndReturn(run func(context.Context, int32, pool.Runnable) pool.CompletionFuture) *MockExecutor_Submit_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+type mockConstructorTestingTNewMockExecutor interface {
+	mock.TestingT
+	Cleanup(func())
+}
+
+// NewMockExecutor creates a new instance of MockExecutor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+func NewMockExecutor(t mockConstructorTestingTNewMockExecutor) *MockExecutor {
+	mock := &MockExecutor{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/plc4go/spi/transactions/mock_RequestTransaction_test.go b/plc4go/spi/transactions/mock_RequestTransaction_test.go
index 2c8e24ec20..a177a118a3 100644
--- a/plc4go/spi/transactions/mock_RequestTransaction_test.go
+++ b/plc4go/spi/transactions/mock_RequestTransaction_test.go
@@ -165,6 +165,47 @@ func (_c *MockRequestTransaction_FailRequest_Call) RunAndReturn(run func(error)
 	return _c
 }
 
+// IsCompleted provides a mock function with given fields:
+func (_m *MockRequestTransaction) IsCompleted() bool {
+	ret := _m.Called()
+
+	var r0 bool
+	if rf, ok := ret.Get(0).(func() bool); ok {
+		r0 = rf()
+	} else {
+		r0 = ret.Get(0).(bool)
+	}
+
+	return r0
+}
+
+// MockRequestTransaction_IsCompleted_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsCompleted'
+type MockRequestTransaction_IsCompleted_Call struct {
+	*mock.Call
+}
+
+// IsCompleted is a helper method to define mock.On call
+func (_e *MockRequestTransaction_Expecter) IsCompleted() *MockRequestTransaction_IsCompleted_Call {
+	return &MockRequestTransaction_IsCompleted_Call{Call: _e.mock.On("IsCompleted")}
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Run(run func()) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run()
+	})
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) Return(_a0 bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockRequestTransaction_IsCompleted_Call) RunAndReturn(run func() bool) *MockRequestTransaction_IsCompleted_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
 // String provides a mock function with given fields:
 func (_m *MockRequestTransaction) String() string {
 	ret := _m.Called()
diff --git a/plc4go/spi/transactions/mock_requirements.go b/plc4go/spi/transactions/mock_requirements.go
index 64f8171934..86b5f351e3 100644
--- a/plc4go/spi/transactions/mock_requirements.go
+++ b/plc4go/spi/transactions/mock_requirements.go
@@ -29,3 +29,8 @@ import (
 type CompletionFuture interface {
 	pool.CompletionFuture
 }
+
+// Deprecated: don't use it in productive code
+type Executor interface {
+	pool.Executor
+}