You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/05/31 12:54:56 UTC
[plc4x] 02/03: refactor(plc4go/spi): move WorkerPool to own package
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit c4bf3ebd6206d470ab570c4d67710b95fe153c42
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed May 31 14:49:15 2023 +0200
refactor(plc4go/spi): move WorkerPool to own package
---
plc4go/spi/{utils => pool}/WorkerPool.go | 82 ++++++----
plc4go/spi/{utils => pool}/WorkerPool_test.go | 55 ++++---
plc4go/spi/utils/mock_ExecutorOption_test.go | 85 -----------
plc4go/spi/utils/mock_Executor_test.go | 207 --------------------------
4 files changed, 82 insertions(+), 347 deletions(-)
diff --git a/plc4go/spi/utils/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
similarity index 80%
rename from plc4go/spi/utils/WorkerPool.go
rename to plc4go/spi/pool/WorkerPool.go
index a1d8202a7c..ae1f59849f 100644
--- a/plc4go/spi/utils/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -17,14 +17,16 @@
* under the License.
*/
-package utils
+package pool
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
+ "io"
"runtime/debug"
"sync"
"sync/atomic"
@@ -41,6 +43,8 @@ type worker struct {
executor *executor
hasEnded atomic.Bool
lastReceived time.Time
+
+ log zerolog.Logger
}
func (w *worker) initialize() {
@@ -54,7 +58,7 @@ func (w *worker) initialize() {
func (w *worker) work() {
defer func() {
if recovered := recover(); recovered != nil {
- log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
+ w.log.Error().Msgf("Recovering from panic():%v. Stack: %s", recovered, debug.Stack())
}
if !w.shutdown.Load() {
// if we are not in shutdown we continue
@@ -72,7 +76,7 @@ func (w *worker) work() {
for !w.shutdown.Load() {
workerLog.Debug().Msg("Working")
select {
- case _workItem := <-w.executor.queue:
+ case _workItem := <-w.executor.workItems:
w.lastReceived = time.Now()
workerLog.Debug().Msgf("Got work item %v", _workItem)
if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
@@ -103,6 +107,7 @@ func (w *workItem) String() string {
}
type Executor interface {
+ io.Closer
Start()
Stop()
Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture
@@ -115,24 +120,32 @@ type executor struct {
shutdown bool
stateChange sync.Mutex
worker []*worker
- queue chan workItem
+ workItems chan workItem
traceWorkers bool
+
+ log zerolog.Logger
}
-func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
+func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
workers := make([]*worker, numberOfWorkers)
+ customLogger := options.ExtractCustomLogger(_options...)
for i := 0; i < numberOfWorkers; i++ {
workers[i] = &worker{
- id: i,
+ id: i,
+ log: customLogger,
}
}
_executor := &executor{
maxNumberOfWorkers: numberOfWorkers,
- queue: make(chan workItem, queueDepth),
+ workItems: make(chan workItem, queueDepth),
worker: workers,
+ log: customLogger,
}
- for _, option := range options {
- option(_executor)
+ for _, option := range _options {
+ switch option := option.(type) {
+ case *tracerWorkersOption:
+ _executor.traceWorkers = option.traceWorkers
+ }
}
for i := 0; i < numberOfWorkers; i++ {
workers[i].executor = _executor
@@ -144,14 +157,19 @@ var upScaleInterval = 100 * time.Millisecond
var downScaleInterval = 5 * time.Second
var timeToBecomeUnused = 5 * time.Second
-func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorOption) Executor {
+func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.WithOption) Executor {
+ customLogger := options.ExtractCustomLogger(_options...)
_executor := &executor{
maxNumberOfWorkers: maxNumberOfWorkers,
- queue: make(chan workItem, queueDepth),
+ workItems: make(chan workItem, queueDepth),
worker: make([]*worker, 0),
+ log: customLogger,
}
- for _, option := range options {
- option(_executor)
+ for _, option := range _options {
+ switch option := option.(type) {
+ case *tracerWorkersOption:
+ _executor.traceWorkers = option.traceWorkers
+ }
}
// We spawn one initial worker
_executor.worker = append(_executor.worker, &worker{
@@ -159,13 +177,14 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
interrupter: make(chan struct{}, 1),
executor: _executor,
lastReceived: time.Now(),
+ log: customLogger,
})
mutex := sync.Mutex{}
// Worker spawner
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ customLogger.Error().Msgf("panic-ed %v", err)
}
}()
workerLog := log.With().Str("Worker type", "spawner").Logger()
@@ -176,7 +195,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
workerLog.Debug().Msgf("Sleeping for %v", upScaleInterval)
time.Sleep(upScaleInterval)
mutex.Lock()
- numberOfItemsInQueue := len(_executor.queue)
+ numberOfItemsInQueue := len(_executor.workItems)
numberOfWorkers := len(_executor.worker)
workerLog.Debug().Msgf("Checking if %d > %d && %d < %d", numberOfItemsInQueue, numberOfWorkers, numberOfWorkers, maxNumberOfWorkers)
if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < maxNumberOfWorkers {
@@ -185,6 +204,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
interrupter: make(chan struct{}, 1),
executor: _executor,
lastReceived: time.Now(),
+ log: customLogger,
}
_executor.worker = append(_executor.worker, _worker)
_worker.initialize()
@@ -200,7 +220,7 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
go func() {
defer func() {
if err := recover(); err != nil {
- log.Error().Msgf("panic-ed %v", err)
+ _executor.log.Error().Msgf("panic-ed %v", err)
}
}()
workerLog := log.With().Str("Worker type", "killer").Logger()
@@ -231,12 +251,13 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, options ...ExecutorO
return _executor
}
-type ExecutorOption func(*executor)
+func WithExecutorOptionTracerWorkers(traceWorkers bool) options.WithOption {
+ return &tracerWorkersOption{traceWorkers: traceWorkers}
+}
-func WithExecutorOptionTracerWorkers(traceWorkers bool) ExecutorOption {
- return func(executor *executor) {
- executor.traceWorkers = traceWorkers
- }
+type tracerWorkersOption struct {
+ options.Option
+ traceWorkers bool
}
func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
@@ -245,20 +266,24 @@ func (e *executor) Submit(ctx context.Context, workItemId int32, runnable Runnab
value.Store(errors.New("runnable must not be nil"))
return &future{err: value}
}
- log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
+ e.log.Trace().Int32("workItemId", workItemId).Msg("Submitting runnable")
completionFuture := &future{}
+ if e.shutdown {
+ completionFuture.Cancel(false, errors.New("executor in shutdown"))
+ return completionFuture
+ }
select {
- case e.queue <- workItem{
+ case e.workItems <- workItem{
workItemId: workItemId,
runnable: runnable,
completionFuture: completionFuture,
}:
- log.Trace().Msg("Item added")
+ e.log.Trace().Msg("Item added")
case <-ctx.Done():
completionFuture.Cancel(false, ctx.Err())
}
- log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
+ e.log.Trace().Int32("workItemId", workItemId).Msg("runnable queued")
return completionFuture
}
@@ -284,17 +309,22 @@ func (e *executor) Stop() {
return
}
e.shutdown = true
- close(e.queue)
for i := 0; i < len(e.worker); i++ {
worker := e.worker[i]
worker.shutdown.Store(true)
worker.interrupted.Store(true)
close(worker.interrupter)
}
+ close(e.workItems)
e.running = false
e.shutdown = false
}
+func (e *executor) Close() error {
+ e.Stop()
+ return nil
+}
+
func (e *executor) IsRunning() bool {
return e.running && !e.shutdown
}
diff --git a/plc4go/spi/utils/WorkerPool_test.go b/plc4go/spi/pool/WorkerPool_test.go
similarity index 93%
rename from plc4go/spi/utils/WorkerPool_test.go
rename to plc4go/spi/pool/WorkerPool_test.go
index fcb006b982..d940b1c545 100644
--- a/plc4go/spi/utils/WorkerPool_test.go
+++ b/plc4go/spi/pool/WorkerPool_test.go
@@ -17,11 +17,12 @@
* under the License.
*/
-package utils
+package pool
import (
"context"
"fmt"
+ "github.com/apache/plc4x/plc4go/spi/options"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"math/rand"
@@ -68,7 +69,7 @@ func TestExecutor_Start(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queue: tt.fields.queue,
+ workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Start()
@@ -116,7 +117,7 @@ func TestExecutor_Stop(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queue: tt.fields.queue,
+ workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Stop()
@@ -198,7 +199,7 @@ func TestExecutor_Submit(t *testing.T) {
running: executor.running,
shutdown: executor.shutdown,
worker: executor.worker,
- queue: executor.queue,
+ queue: executor.workItems,
traceWorkers: true,
}
}(),
@@ -222,7 +223,7 @@ func TestExecutor_Submit(t *testing.T) {
running: tt.fields.running,
shutdown: tt.fields.shutdown,
worker: tt.fields.worker,
- queue: tt.fields.queue,
+ workItems: tt.fields.queue,
traceWorkers: tt.fields.traceWorkers,
}
e.Start()
@@ -239,7 +240,7 @@ func TestNewFixedSizeExecutor(t *testing.T) {
type args struct {
numberOfWorkers int
queueDepth int
- options []ExecutorOption
+ options []options.WithOption
}
tests := []struct {
name string
@@ -251,10 +252,10 @@ func TestNewFixedSizeExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
},
executorValidator: func(t *testing.T, e *executor) bool {
- return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.queue) == 14
+ return !e.running && !e.shutdown && len(e.worker) == 13 && cap(e.workItems) == 14
},
},
}
@@ -271,7 +272,7 @@ func TestNewDynamicExecutor(t *testing.T) {
type args struct {
numberOfWorkers int
queueDepth int
- options []ExecutorOption
+ options []options.WithOption
}
tests := []struct {
name string
@@ -284,13 +285,13 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 13,
queueDepth: 14,
- options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
},
executorValidator: func(t *testing.T, e *executor) bool {
assert.False(t, e.running)
assert.False(t, e.shutdown)
assert.Len(t, e.worker, 1)
- assert.Equal(t, cap(e.queue), 14)
+ assert.Equal(t, cap(e.workItems), 14)
return true
},
},
@@ -299,7 +300,7 @@ func TestNewDynamicExecutor(t *testing.T) {
args: args{
numberOfWorkers: 2,
queueDepth: 2,
- options: []ExecutorOption{WithExecutorOptionTracerWorkers(true)},
+ options: []options.WithOption{WithExecutorOptionTracerWorkers(true)},
},
manipulator: func(t *testing.T, e *executor) {
{
@@ -331,7 +332,7 @@ func TestNewDynamicExecutor(t *testing.T) {
t.Log("fill some jobs")
go func() {
for i := 0; i < 500; i++ {
- e.queue <- workItem{
+ e.workItems <- workItem{
workItemId: int32(i),
runnable: func() {
max := 100
@@ -372,21 +373,17 @@ func TestWithExecutorOptionTracerWorkers(t *testing.T) {
tests := []struct {
name string
args args
- executorValidator func(*testing.T, *executor) bool
+ executorValidator options.WithOption
}{
{
- name: "option should set option",
- args: args{traceWorkers: true},
- executorValidator: func(t *testing.T, e *executor) bool {
- return e.traceWorkers == true
- },
+ name: "option should set option",
+ args: args{traceWorkers: true},
+ executorValidator: &tracerWorkersOption{traceWorkers: true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- var _executor executor
- WithExecutorOptionTracerWorkers(tt.args.traceWorkers)(&_executor)
- assert.True(t, tt.executorValidator(t, &_executor))
+ assert.Equal(t, tt.executorValidator, WithExecutorOptionTracerWorkers(tt.args.traceWorkers))
})
}
}
@@ -440,11 +437,11 @@ func TestWorker_work(t *testing.T) {
id: 0,
executor: func() *executor {
e := &executor{
- queue: make(chan workItem),
+ workItems: make(chan workItem),
traceWorkers: true,
}
go func() {
- e.queue <- workItem{
+ e.workItems <- workItem{
workItemId: 0,
runnable: func() {
panic("Oh no what should I do???")
@@ -474,11 +471,11 @@ func TestWorker_work(t *testing.T) {
id: 1,
executor: func() *executor {
e := &executor{
- queue: make(chan workItem),
+ workItems: make(chan workItem),
traceWorkers: true,
}
go func() {
- e.queue <- workItem{
+ e.workItems <- workItem{
workItemId: 0,
runnable: func() {
time.Sleep(time.Millisecond * 70)
@@ -507,7 +504,7 @@ func TestWorker_work(t *testing.T) {
id: 1,
executor: func() *executor {
e := &executor{
- queue: make(chan workItem),
+ workItems: make(chan workItem),
traceWorkers: true,
}
return e
@@ -532,13 +529,13 @@ func TestWorker_work(t *testing.T) {
id: 1,
executor: func() *executor {
e := &executor{
- queue: make(chan workItem),
+ workItems: make(chan workItem),
traceWorkers: true,
}
go func() {
completionFuture := &future{}
completionFuture.cancelRequested.Store(true)
- e.queue <- workItem{
+ e.workItems <- workItem{
workItemId: 0,
runnable: func() {
time.Sleep(time.Millisecond * 70)
diff --git a/plc4go/spi/utils/mock_ExecutorOption_test.go b/plc4go/spi/utils/mock_ExecutorOption_test.go
deleted file mode 100644
index 9d0a05afa5..0000000000
--- a/plc4go/spi/utils/mock_ExecutorOption_test.go
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.1. DO NOT EDIT.
-
-package utils
-
-import mock "github.com/stretchr/testify/mock"
-
-// MockExecutorOption is an autogenerated mock type for the ExecutorOption type
-type MockExecutorOption struct {
- mock.Mock
-}
-
-type MockExecutorOption_Expecter struct {
- mock *mock.Mock
-}
-
-func (_m *MockExecutorOption) EXPECT() *MockExecutorOption_Expecter {
- return &MockExecutorOption_Expecter{mock: &_m.Mock}
-}
-
-// Execute provides a mock function with given fields: _a0
-func (_m *MockExecutorOption) Execute(_a0 *executor) {
- _m.Called(_a0)
-}
-
-// MockExecutorOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute'
-type MockExecutorOption_Execute_Call struct {
- *mock.Call
-}
-
-// Execute is a helper method to define mock.On call
-// - _a0 *executor
-func (_e *MockExecutorOption_Expecter) Execute(_a0 interface{}) *MockExecutorOption_Execute_Call {
- return &MockExecutorOption_Execute_Call{Call: _e.mock.On("Execute", _a0)}
-}
-
-func (_c *MockExecutorOption_Execute_Call) Run(run func(_a0 *executor)) *MockExecutorOption_Execute_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(*executor))
- })
- return _c
-}
-
-func (_c *MockExecutorOption_Execute_Call) Return() *MockExecutorOption_Execute_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *MockExecutorOption_Execute_Call) RunAndReturn(run func(*executor)) *MockExecutorOption_Execute_Call {
- _c.Call.Return(run)
- return _c
-}
-
-type mockConstructorTestingTNewMockExecutorOption interface {
- mock.TestingT
- Cleanup(func())
-}
-
-// NewMockExecutorOption creates a new instance of MockExecutorOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockExecutorOption(t mockConstructorTestingTNewMockExecutorOption) *MockExecutorOption {
- mock := &MockExecutorOption{}
- mock.Mock.Test(t)
-
- t.Cleanup(func() { mock.AssertExpectations(t) })
-
- return mock
-}
diff --git a/plc4go/spi/utils/mock_Executor_test.go b/plc4go/spi/utils/mock_Executor_test.go
deleted file mode 100644
index fa86fa9cf4..0000000000
--- a/plc4go/spi/utils/mock_Executor_test.go
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-// Code generated by mockery v2.28.1. DO NOT EDIT.
-
-package utils
-
-import (
- context "context"
-
- mock "github.com/stretchr/testify/mock"
-)
-
-// MockExecutor is an autogenerated mock type for the Executor type
-type MockExecutor struct {
- mock.Mock
-}
-
-type MockExecutor_Expecter struct {
- mock *mock.Mock
-}
-
-func (_m *MockExecutor) EXPECT() *MockExecutor_Expecter {
- return &MockExecutor_Expecter{mock: &_m.Mock}
-}
-
-// IsRunning provides a mock function with given fields:
-func (_m *MockExecutor) IsRunning() bool {
- ret := _m.Called()
-
- var r0 bool
- if rf, ok := ret.Get(0).(func() bool); ok {
- r0 = rf()
- } else {
- r0 = ret.Get(0).(bool)
- }
-
- return r0
-}
-
-// MockExecutor_IsRunning_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IsRunning'
-type MockExecutor_IsRunning_Call struct {
- *mock.Call
-}
-
-// IsRunning is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) IsRunning() *MockExecutor_IsRunning_Call {
- return &MockExecutor_IsRunning_Call{Call: _e.mock.On("IsRunning")}
-}
-
-func (_c *MockExecutor_IsRunning_Call) Run(run func()) *MockExecutor_IsRunning_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run()
- })
- return _c
-}
-
-func (_c *MockExecutor_IsRunning_Call) Return(_a0 bool) *MockExecutor_IsRunning_Call {
- _c.Call.Return(_a0)
- return _c
-}
-
-func (_c *MockExecutor_IsRunning_Call) RunAndReturn(run func() bool) *MockExecutor_IsRunning_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// Start provides a mock function with given fields:
-func (_m *MockExecutor) Start() {
- _m.Called()
-}
-
-// MockExecutor_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start'
-type MockExecutor_Start_Call struct {
- *mock.Call
-}
-
-// Start is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) Start() *MockExecutor_Start_Call {
- return &MockExecutor_Start_Call{Call: _e.mock.On("Start")}
-}
-
-func (_c *MockExecutor_Start_Call) Run(run func()) *MockExecutor_Start_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run()
- })
- return _c
-}
-
-func (_c *MockExecutor_Start_Call) Return() *MockExecutor_Start_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *MockExecutor_Start_Call) RunAndReturn(run func()) *MockExecutor_Start_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// Stop provides a mock function with given fields:
-func (_m *MockExecutor) Stop() {
- _m.Called()
-}
-
-// MockExecutor_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop'
-type MockExecutor_Stop_Call struct {
- *mock.Call
-}
-
-// Stop is a helper method to define mock.On call
-func (_e *MockExecutor_Expecter) Stop() *MockExecutor_Stop_Call {
- return &MockExecutor_Stop_Call{Call: _e.mock.On("Stop")}
-}
-
-func (_c *MockExecutor_Stop_Call) Run(run func()) *MockExecutor_Stop_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run()
- })
- return _c
-}
-
-func (_c *MockExecutor_Stop_Call) Return() *MockExecutor_Stop_Call {
- _c.Call.Return()
- return _c
-}
-
-func (_c *MockExecutor_Stop_Call) RunAndReturn(run func()) *MockExecutor_Stop_Call {
- _c.Call.Return(run)
- return _c
-}
-
-// Submit provides a mock function with given fields: ctx, workItemId, runnable
-func (_m *MockExecutor) Submit(ctx context.Context, workItemId int32, runnable Runnable) CompletionFuture {
- ret := _m.Called(ctx, workItemId, runnable)
-
- var r0 CompletionFuture
- if rf, ok := ret.Get(0).(func(context.Context, int32, Runnable) CompletionFuture); ok {
- r0 = rf(ctx, workItemId, runnable)
- } else {
- if ret.Get(0) != nil {
- r0 = ret.Get(0).(CompletionFuture)
- }
- }
-
- return r0
-}
-
-// MockExecutor_Submit_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Submit'
-type MockExecutor_Submit_Call struct {
- *mock.Call
-}
-
-// Submit is a helper method to define mock.On call
-// - ctx context.Context
-// - workItemId int32
-// - runnable Runnable
-func (_e *MockExecutor_Expecter) Submit(ctx interface{}, workItemId interface{}, runnable interface{}) *MockExecutor_Submit_Call {
- return &MockExecutor_Submit_Call{Call: _e.mock.On("Submit", ctx, workItemId, runnable)}
-}
-
-func (_c *MockExecutor_Submit_Call) Run(run func(ctx context.Context, workItemId int32, runnable Runnable)) *MockExecutor_Submit_Call {
- _c.Call.Run(func(args mock.Arguments) {
- run(args[0].(context.Context), args[1].(int32), args[2].(Runnable))
- })
- return _c
-}
-
-func (_c *MockExecutor_Submit_Call) Return(_a0 CompletionFuture) *MockExecutor_Submit_Call {
- _c.Call.Return(_a0)
- return _c
-}
-
-func (_c *MockExecutor_Submit_Call) RunAndReturn(run func(context.Context, int32, Runnable) CompletionFuture) *MockExecutor_Submit_Call {
- _c.Call.Return(run)
- return _c
-}
-
-type mockConstructorTestingTNewMockExecutor interface {
- mock.TestingT
- Cleanup(func())
-}
-
-// NewMockExecutor creates a new instance of MockExecutor. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-func NewMockExecutor(t mockConstructorTestingTNewMockExecutor) *MockExecutor {
- mock := &MockExecutor{}
- mock.Mock.Test(t)
-
- t.Cleanup(func() { mock.AssertExpectations(t) })
-
- return mock
-}