You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 12:54:56 UTC

[plc4x] 02/03: refactor(plc4go/spi): move WorkerPool to own package

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit c4bf3ebd6206d470ab570c4d67710b95fe153c42
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 14:49:15 2023 +0200

    refactor(plc4go/spi): move WorkerPool to own package
---
 plc4go/spi/{utils => pool}/WorkerPool.go      |  82 ++++++----
 plc4go/spi/{utils => pool}/WorkerPool_test.go |  55 ++++---
 plc4go/spi/utils/mock_ExecutorOption_test.go  |  85 -----------
 plc4go/spi/utils/mock_Executor_test.go        | 207 --------------------------
 4 files changed, 82 insertions(+), 347 deletions(-)

diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
similarity index 80%
rename from plc4go/spi/utils/WorkerPool.go
rename to plc4go/spi/pool/WorkerPool.go
index a1d8202a7c..ae1f59849f 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -17,14 +17,16 @@
  * under the License.
  */
 
-package utils
+package pool
 
 import (
 	"context"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/pkg/errors"
 	"github.com/rs/zerolog"
 	"github.com/rs/zerolog/log"
+	"io"
 	"runtime/debug"
 	"sync"
 	"sync/atomic"
@@ -41,6 +43,8 @@ type worker struct {
 	executor     *executor
 	hasEnded     atomic.Bool
 	lastReceived time.Time
+
+	log zerolog.Logger
 }
 
 func (w *worker) initialize() {
@@ -54,7 +58,7 @@ func (w *worker) initialize() {
 func (w *worker) work() {
 	defer func() {
 		if recovered := recover(); recovered != nil {
-			log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
+			w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
 		}
 		if !w.shutdown.Load() {
 			// if we are not in shutdown we continue
@@ -72,7 +76,7 @@ func (w *worker) work() {
 	for !w.shutdown.Load() {
 		workerLog.Debug().Msg("Working")
 		select {
-		case _workItem := <-w.executor.queue:
+		case _workItem := <-w.executor.workItems:
 			w.lastReceived = time.Now()
 			workerLog.Debug().Msgf("Got work item %v", _workItem)
 			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
@@ -103,6 +107,7 @@ func (w *workItem) String() string {
 }
 
 type Executor interface {
+	io.Closer
 	Start()
 	Stop()
 	Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture
@@ -115,24 +120,32 @@ type executor struct {
 	shutdown           bool
 	stateChange        sync.Mutex
 	worker             []*worker
-	queue              chan workItem
+	workItems          chan workItem
 	traceWorkers       bool
+
+	log zerolog.Logger
 }
 
-func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
+func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
 	workers := make([]*worker, numberOfWorkers)
+	customLogger := options.ExtractCustomLogger(_options...)
 	for i := 0; i < numberOfWorkers; i++ {
 		workers[i] = &worker{
-			id: i,
+			id:  i,
+			log: customLogger,
 		}
 	}
 	_executor := &executor{
 		maxNumberOfWorkers: numberOfWorkers,
-		queue:              make(chan workItem, queueDepth),
+		workItems:          make(chan workItem, queueDepth),
 		worker:             workers,
+		log:                customLogger,
 	}
-	for _, option := range options {
-		option(_executor)
+	for _, option := range _options {
+		switch option := option.(type) {
+		case *tracerWorkersOption:
+			_executor.traceWorkers = option.traceWorkers
+		}
 	}
 	for i := 0; i < numberOfWorkers; i++ {
 		workers[i].executor = _executor
@@ -144,14 +157,19 @@ var upScaleInterval = 100 * time.Millisecond
 var downScaleInterval = 5 * time.Second
 var timeToBecomeUnused = 5 * time.Second
 
-func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
+func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
+	customLogger := options.ExtractCustomLogger(_options...)
 	_executor := &executor{
 		maxNumberOfWorkers: maxNumberOfWorkers,
-		queue:              make(chan workItem, queueDepth),
+		workItems:          make(chan workItem, queueDepth),
 		worker:             make([]*worker, 0),
+		log:                customLogger,
 	}
-	for _, option := range options {
-		option(_executor)
+	for _, option := range _options {
+		switch option := option.(type) {
+		case *tracerWorkersOption:
+			_executor.traceWorkers = option.traceWorkers
+		}
 	}
 	// We spawn one initial worker
 	_executor.worker = append(_executor.worker, &worker{
@@ -159,13 +177,14 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 		interrupter:  make(chan struct{}, 1),
 		executor:     _executor,
 		lastReceived: time.Now(),
+		log:          customLogger,
 	})
 	mutex := sync.Mutex{}
 	// Worker spawner
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
-				log.Error().Msgf("panic-ed %v", err)
+				customLogger.Error().Msgf("panic-ed %v", err)
 			}
 		}()
 		workerLog := log.With().Str("Worker type", "spawner").Logger()
@@ -176,7 +195,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 			workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
 			time.Sleep(upScaleInterval)
 			mutex.Lock()
-			numberOfItemsInQueue := len(_executor.queue)
+			numberOfItemsInQueue := len(_executor.workItems)
 			numberOfWorkers := len(_executor.worker)
 			workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
 			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
@@ -185,6 +204,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 					interrupter:  make(chan struct{}, 1),
 					executor:     _executor,
 					lastReceived: time.Now(),
+					log:          customLogger,
 				}
 				_executor.worker = append(_executor.worker, _worker)
 				_worker.initialize()
@@ -200,7 +220,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 	go func() {
 		defer func() {
 			if err := recover(); err != nil {
-				log.Error().Msgf("panic-ed %v", err)
+				_executor.log.Error().Msgf("panic-ed %v", err)
 			}
 		}()
 		workerLog := log.With().Str("Worker type", "killer").Logger()
@@ -231,12 +251,13 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
 	return _executor
 }
 
-type ExecutorOption func(*executor)
+func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
+	return &tracerWorkersOption{traceWorkers: traceWorkers}
+}
 
-func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
-	return func(executor *executor) {
-		executor.traceWorkers = traceWorkers
-	}
+type tracerWorkersOption struct {
+	options.Option
+	traceWorkers bool
 }
 
 func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
@@ -245,20 +266,24 @@ func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnab
 		value.Store(errors.New("runnable must not be nil"))
 		return &future{err: value}
 	}
-	log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+	e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
 	completionFuture := &future{}
+	if e.shutdown {
+		completionFuture.Cancel(false, errors.New("executor in shutdown"))
+		return completionFuture
+	}
 	select {
-	case e.queue <- workItem{
+	case e.workItems <- workItem{
 		workItemId:       workItemId,
 		runnable:         runnable,
 		completionFuture: completionFuture,
 	}:
-		log.Trace().Msg("Item added")
+		e.log.Trace().Msg("Item added")
 	case <-ctx.Done():
 		completionFuture.Cancel(false, ctx.Err())
 	}
 
-	log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+	e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
 	return completionFuture
 }
 
@@ -284,17 +309,22 @@ func (e *executor) Stop() {
 		return
 	}
 	e.shutdown = true
-	close(e.queue)
 	for i := 0; i < len(e.worker); i++ {
 		worker := e.worker[i]
 		worker.shutdown.Store(true)
 		worker.interrupted.Store(true)
 		close(worker.interrupter)
 	}
+	close(e.workItems)
 	e.running = false
 	e.shutdown = false
 }
 
+func (e *executor) Close() error {
+	e.Stop()
+	return nil
+}
+
 func (e *executor) IsRunning() bool {
 	return e.running && !e.shutdown
 }
diff --git a/plc4go/spi/utils/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
similarity index 93%
rename from plc4go/spi/utils/WorkerPool_test.go
rename to plc4go/spi/pool/WorkerPool_test.go
index fcb006b982..d940b1c545 100644
--- a/plc4go/spi/utils/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -17,11 +17,12 @@
  * under the License.
  */
 
-package utils
+package pool
 
 import (
 	"context"
 	"fmt"
+	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"math/rand"
@@ -68,7 +69,7 @@ func TestExecutor_Start(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queue:        tt.fields.queue,
+				workItems:    tt.fields.queue,
 				traceWorkers: tt.fields.traceWorkers,
 			}
 			e.Start()
@@ -116,7 +117,7 @@ func TestExecutor_Stop(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queue:        tt.fields.queue,
+				workItems:    tt.fields.queue,
 				traceWorkers: tt.fields.traceWorkers,
 			}
 			e.Stop()
@@ -198,7 +199,7 @@ func TestExecutor_Submit(t *testing.T) {
 					running:      executor.running,
 					shutdown:     executor.shutdown,
 					worker:       executor.worker,
-					queue:        executor.queue,
+					queue:        executor.workItems,
 					traceWorkers: true,
 				}
 			}(),
@@ -222,7 +223,7 @@ func TestExecutor_Submit(t *testing.T) {
 				running:      tt.fields.running,
 				shutdown:     tt.fields.shutdown,
 				worker:       tt.fields.worker,
-				queue:        tt.fields.queue,
+				workItems:    tt.fields.queue,
 				traceWorkers: tt.fields.traceWorkers,
 			}
 			e.Start()
@@ -239,7 +240,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 	type args struct {
 		numberOfWorkers int
 		queueDepth      int
-		options         []ExecutorOption
+		options         []options.WithOption
 	}
 	tests := []struct {
 		name              string
@@ -251,10 +252,10 @@ func TestNewFixedSizeExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
 			},
 			executorValidator: func(t *testing.T, e *executor) bool {
-				return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.queue) == 14
+				return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.workItems) == 14
 			},
 		},
 	}
@@ -271,7 +272,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 	type args struct {
 		numberOfWorkers int
 		queueDepth      int
-		options         []ExecutorOption
+		options         []options.WithOption
 	}
 	tests := []struct {
 		name              string
@@ -284,13 +285,13 @@ func TestNewDynamicExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 13,
 				queueDepth:      14,
-				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
 			},
 			executorValidator: func(t *testing.T, e *executor) bool {
 				assert.False(t, e.running)
 				assert.False(t, e.shutdown)
 				assert.Len(t, e.worker, 1)
-				assert.Equal(t, cap(e.queue), 14)
+				assert.Equal(t, cap(e.workItems), 14)
 				return true
 			},
 		},
@@ -299,7 +300,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 			args: args{
 				numberOfWorkers: 2,
 				queueDepth:      2,
-				options:         []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+				options:         []options.WithOption{WithExecutorOptionTracerWorkers(true)},
 			},
 			manipulator: func(t *testing.T, e *executor) {
 				{
@@ -331,7 +332,7 @@ func TestNewDynamicExecutor(t *testing.T) {
 				t.Log("fill some jobs")
 				go func() {
 					for i := 0; i < 500; i++ {
-						e.queue <- workItem{
+						e.workItems <- workItem{
 							workItemId: int32(i),
 							runnable: func() {
 								max := 100
@@ -372,21 +373,17 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
 	tests := []struct {
 		name              string
 		args              args
-		executorValidator func(*testing.T, *executor) bool
+		executorValidator options.WithOption
 	}{
 		{
-			name: "option should set option",
-			args: args{traceWorkers: true},
-			executorValidator: func(t *testing.T, e *executor) bool {
-				return e.traceWorkers == true
-			},
+			name:              "option should set option",
+			args:              args{traceWorkers: true},
+			executorValidator: &tracerWorkersOption{traceWorkers: true},
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			var _executor executor
-			WithExecutorOptionTracerWorkers(tt.args.traceWorkers)(&_executor)
-			assert.True(t, tt.executorValidator(t, &_executor))
+			assert.Equal(t, tt.executorValidator, WithExecutorOptionTracerWorkers(tt.args.traceWorkers))
 		})
 	}
 }
@@ -440,11 +437,11 @@ func TestWorker_work(t *testing.T) {
 				id: 0,
 				executor: func() *executor {
 					e := &executor{
-						queue:        make(chan workItem),
+						workItems:    make(chan workItem),
 						traceWorkers: true,
 					}
 					go func() {
-						e.queue <- workItem{
+						e.workItems <- workItem{
 							workItemId: 0,
 							runnable: func() {
 								panic("Oh no what should I do???")
@@ -474,11 +471,11 @@ func TestWorker_work(t *testing.T) {
 				id: 1,
 				executor: func() *executor {
 					e := &executor{
-						queue:        make(chan workItem),
+						workItems:    make(chan workItem),
 						traceWorkers: true,
 					}
 					go func() {
-						e.queue <- workItem{
+						e.workItems <- workItem{
 							workItemId: 0,
 							runnable: func() {
 								time.Sleep(time.Millisecond * 70)
@@ -507,7 +504,7 @@ func TestWorker_work(t *testing.T) {
 				id: 1,
 				executor: func() *executor {
 					e := &executor{
-						queue:        make(chan workItem),
+						workItems:    make(chan workItem),
 						traceWorkers: true,
 					}
 					return e
@@ -532,13 +529,13 @@ func TestWorker_work(t *testing.T) {
 				id: 1,
 				executor: func() *executor {
 					e := &executor{
-						queue:        make(chan workItem),
+						workItems:    make(chan workItem),
 						traceWorkers: true,
 					}
 					go func() {
 						completionFuture := &future{}
 						completionFuture.cancelRequested.Store(true)
-						e.queue <- workItem{
+						e.workItems <- workItem{
 							workItemId: 0,
 							runnable: func() {
 								time.Sleep(time.Millisecond * 70)
diff --git a/plc4go/spi/utils/mock_ExecutorOption_test.go b/plc4go/spi/utils/mock_ExecutorOption_test.go
deleted file mode 100644
index 9d0a05afa5..0000000000
--- a/plc4go/spi/utils/mock_ExecutorOption_test.go
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.1. DO NOT EDIT.
-
-package utils
-
-import mock "github.com/stretchr/testify/mock"
-
-// MockExecutorOption is an autogenerated mock type for the ExecutorOption type
-type MockExecutorOption struct {
-	mock.Mock
-}
-
-type MockExecutorOption_Expecter struct {
-	mock *mock.Mock
-}
-
-func (_m *MockExecutorOption) EXPECT() *MockExecutorOption_Expecter {
-	return &MockExecutorOption_Expecter{mock: &_m.Mock}
-}
-
-// Execute provides a mock function with given fields: _a0
-func (_m *MockExecutorOption) Execute(_a0 *executor) {
-	_m.Called(_a0)
-}
-
-// MockExecutorOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
-type MockExecutorOption_Execute_Call struct {
-	*mock.Call
-}
-
-// Execute is a helper method to define mock.On call
-//   - _a0 *executor
-func (_e *MockExecutorOption_Expecter) Execute(_a0 interface{}) *MockExecutorOption_Execute_Call {
-	return &MockExecutorOption_Execute_Call{Call: _e.mock.On("Execute", _a0)}
-}
-
-func (_c *MockExecutorOption_Execute_Call) Run(run func(_a0 *executor)) *MockExecutorOption_Execute_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(*executor))
-	})
-	return _c
-}
-
-func (_c *MockExecutorOption_Execute_Call) Return() *MockExecutorOption_Execute_Call {
-	_c.Call.Return()
-	return _c
-}
-
-func (_c *MockExecutorOption_Execute_Call) RunAndReturn(run func(*executor)) *MockExecutorOption_Execute_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-type mockConstructorTestingTNewMockExecutorOption interface {
-	mock.TestingT
-	Cleanup(func())
-}
-
-// NewMockExecutorOption creates a new instance of MockExecutorOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockExecutorOption(t mockConstructorTestingTNewMockExecutorOption) *MockExecutorOption {
-	mock := &MockExecutorOption{}
-	mock.Mock.Test(t)
-
-	t.Cleanup(func() { mock.AssertExpectations(t) })
-
-	return mock
-}
diff --git a/plc4go/spi/utils/mock_Executor_test.go b/plc4go/spi/utils/mock_Executor_test.go
deleted file mode 100644
index fa86fa9cf4..0000000000
--- a/plc4go/spi/utils/mock_Executor_test.go
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.1. DO NOT EDIT.
-
-package utils
-
-import (
-	context "context"
-
-	mock "github.com/stretchr/testify/mock"
-)
-
-// MockExecutor is an autogenerated mock type for the Executor type
-type MockExecutor struct {
-	mock.Mock
-}
-
-type MockExecutor_Expecter struct {
-	mock *mock.Mock
-}
-
-func (_m *MockExecutor) EXPECT() *MockExecutor_Expecter {
-	return &MockExecutor_Expecter{mock: &_m.Mock}
-}
-
-// IsRunning provides a mock function with given fields:
-func (_m *MockExecutor) IsRunning() bool {
-	ret := _m.Called()
-
-	var r0 bool
-	if rf, ok := ret.Get(0).(func() bool); ok {
-		r0 = rf()
-	} else {
-		r0 = ret.Get(0).(bool)
-	}
-
-	return r0
-}
-
-// MockExecutor_IsRunning_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsRunning'
-type MockExecutor_IsRunning_Call struct {
-	*mock.Call
-}
-
-// IsRunning is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) IsRunning() *MockExecutor_IsRunning_Call {
-	return &MockExecutor_IsRunning_Call{Call: _e.mock.On("IsRunning")}
-}
-
-func (_c *MockExecutor_IsRunning_Call) Run(run func()) *MockExecutor_IsRunning_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run()
-	})
-	return _c
-}
-
-func (_c *MockExecutor_IsRunning_Call) Return(_a0 bool) *MockExecutor_IsRunning_Call {
-	_c.Call.Return(_a0)
-	return _c
-}
-
-func (_c *MockExecutor_IsRunning_Call) RunAndReturn(run func() bool) *MockExecutor_IsRunning_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-// Start provides a mock function with given fields:
-func (_m *MockExecutor) Start() {
-	_m.Called()
-}
-
-// MockExecutor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
-type MockExecutor_Start_Call struct {
-	*mock.Call
-}
-
-// Start is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) Start() *MockExecutor_Start_Call {
-	return &MockExecutor_Start_Call{Call: _e.mock.On("Start")}
-}
-
-func (_c *MockExecutor_Start_Call) Run(run func()) *MockExecutor_Start_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run()
-	})
-	return _c
-}
-
-func (_c *MockExecutor_Start_Call) Return() *MockExecutor_Start_Call {
-	_c.Call.Return()
-	return _c
-}
-
-func (_c *MockExecutor_Start_Call) RunAndReturn(run func()) *MockExecutor_Start_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-// Stop provides a mock function with given fields:
-func (_m *MockExecutor) Stop() {
-	_m.Called()
-}
-
-// MockExecutor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
-type MockExecutor_Stop_Call struct {
-	*mock.Call
-}
-
-// Stop is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) Stop() *MockExecutor_Stop_Call {
-	return &MockExecutor_Stop_Call{Call: _e.mock.On("Stop")}
-}
-
-func (_c *MockExecutor_Stop_Call) Run(run func()) *MockExecutor_Stop_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run()
-	})
-	return _c
-}
-
-func (_c *MockExecutor_Stop_Call) Return() *MockExecutor_Stop_Call {
-	_c.Call.Return()
-	return _c
-}
-
-func (_c *MockExecutor_Stop_Call) RunAndReturn(run func()) *MockExecutor_Stop_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-// Submit provides a mock function with given fields: ctx, workItemId, runnable
-func (_m *MockExecutor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
-	ret := _m.Called(ctx, workItemId, runnable)
-
-	var r0 CompletionFuture
-	if rf, ok := ret.Get(0).(func(context.Context, int32, Runnable) CompletionFuture); ok {
-		r0 = rf(ctx, workItemId, runnable)
-	} else {
-		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(CompletionFuture)
-		}
-	}
-
-	return r0
-}
-
-// MockExecutor_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
-type MockExecutor_Submit_Call struct {
-	*mock.Call
-}
-
-// Submit is a helper method to define mock.On call
-//   - ctx context.Context
-//   - workItemId int32
-//   - runnable Runnable
-func (_e *MockExecutor_Expecter) Submit(ctx interface{}, workItemId interface{}, runnable interface{}) *MockExecutor_Submit_Call {
-	return &MockExecutor_Submit_Call{Call: _e.mock.On("Submit", ctx, workItemId, runnable)}
-}
-
-func (_c *MockExecutor_Submit_Call) Run(run func(ctx context.Context, workItemId int32, runnable Runnable)) *MockExecutor_Submit_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		run(args[0].(context.Context), args[1].(int32), args[2].(Runnable))
-	})
-	return _c
-}
-
-func (_c *MockExecutor_Submit_Call) Return(_a0 CompletionFuture) *MockExecutor_Submit_Call {
-	_c.Call.Return(_a0)
-	return _c
-}
-
-func (_c *MockExecutor_Submit_Call) RunAndReturn(run func(context.Context, int32, Runnable) CompletionFuture) *MockExecutor_Submit_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-type mockConstructorTestingTNewMockExecutor interface {
-	mock.TestingT
-	Cleanup(func())
-}
-
-// NewMockExecutor creates a new instance of MockExecutor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockExecutor(t mockConstructorTestingTNewMockExecutor) *MockExecutor {
-	mock := &MockExecutor{}
-	mock.Mock.Test(t)
-
-	t.Cleanup(func() { mock.AssertExpectations(t) })
-
-	return mock
-}