You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/14 18:30:44 UTC

[plc4x] branch develop updated: fix(plc4go/spi): fix data race in executor

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 860a15b0e2 fix(plc4go/spi): fix data race in executor
860a15b0e2 is described below

commit 860a15b0e2c7f9d676aa589363e55e2024c40dd9
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Jun 14 20:30:37 2023 +0200

    fix(plc4go/spi): fix data race in executor
---
 plc4go/spi/pool/WorkerPool.go           | 19 +++++++++++--------
 plc4go/spi/pool/dynamicExecutor.go      | 18 +++++++++---------
 plc4go/spi/pool/dynamicExecutor_test.go |  9 ++++++++-
 plc4go/spi/pool/executor.go             |  9 ++++++---
 plc4go/spi/pool/executor_test.go        | 14 +++++++++-----
 plc4go/spi/pool/worker.go               |  6 +++---
 plc4go/spi/pool/worker_plc4xgen.go      | 19 +++++++++++++++++--
 plc4go/spi/pool/worker_test.go          | 24 ++++++++++++------------
 8 files changed, 75 insertions(+), 43 deletions(-)

diff --git a/plc4go/spi/pool/WorkerPool.go b/plc4go/spi/pool/WorkerPool.go
index 52a0c4df2b..82d0e7a4a1 100644
--- a/plc4go/spi/pool/WorkerPool.go
+++ b/plc4go/spi/pool/WorkerPool.go
@@ -45,10 +45,12 @@ func NewFixedSizeExecutor(numberOfWorkers, queueDepth int, _options ...options.W
 	workers := make([]*worker, numberOfWorkers)
 	customLogger := options.ExtractCustomLogger(_options...)
 	for i := 0; i < numberOfWorkers; i++ {
-		workers[i] = &worker{
+		w := worker{
 			id:  i,
 			log: customLogger,
 		}
+		w.lastReceived.Store(time.Time{})
+		workers[i] = &w
 	}
 	_executor := &executor{
 		queueDepth: queueDepth,
@@ -85,13 +87,14 @@ func NewDynamicExecutor(maxNumberOfWorkers, queueDepth int, _options ...options.
 		}
 	}
 	// We spawn one initial worker
-	_executor.worker = append(_executor.worker, &worker{
-		id:           0,
-		interrupter:  make(chan struct{}, 1),
-		executor:     _executor,
-		lastReceived: time.Now(),
-		log:          customLogger,
-	})
+	w := worker{
+		id:          0,
+		interrupter: make(chan struct{}, 1),
+		executor:    _executor,
+		log:         customLogger,
+	}
+	w.lastReceived.Store(time.Now())
+	_executor.worker = append(_executor.worker, &w)
 	return _executor
 }
 
diff --git a/plc4go/spi/pool/dynamicExecutor.go b/plc4go/spi/pool/dynamicExecutor.go
index 3af93a9244..12a39665ca 100644
--- a/plc4go/spi/pool/dynamicExecutor.go
+++ b/plc4go/spi/pool/dynamicExecutor.go
@@ -73,7 +73,7 @@ func (e *dynamicExecutor) Start() {
 		if !e.traceWorkers {
 			workerLog = zerolog.Nop()
 		}
-		for e.running && !e.shutdown {
+		for e.IsRunning() {
 			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			numberOfItemsInQueue := len(e.workItems)
@@ -82,12 +82,12 @@ func (e *dynamicExecutor) Start() {
 			if numberOfItemsInQueue > numberOfWorkers && numberOfWorkers < e.maxNumberOfWorkers {
 				workerLog.Trace().Msg("spawning new worker")
 				_worker := &worker{
-					id:           numberOfWorkers - 1,
-					interrupter:  make(chan struct{}, 1),
-					executor:     e,
-					lastReceived: time.Now(),
-					log:          e.log,
+					id:          numberOfWorkers - 1,
+					interrupter: make(chan struct{}, 1),
+					executor:    e,
+					log:         e.log,
 				}
+				_worker.lastReceived.Store(time.Now())
 				e.worker = append(e.worker, _worker)
 				_worker.initialize()
 				workerLog.Info().Int("Worker id", _worker.id).Msg("spawning")
@@ -123,14 +123,14 @@ func (e *dynamicExecutor) Start() {
 		if !e.traceWorkers {
 			workerLog = zerolog.Nop()
 		}
-		for e.running && !e.shutdown {
+		for e.IsRunning() {
 			workerLog.Trace().Msg("running")
 			mutex.Lock()
 			newWorkers := make([]*worker, 0)
 			for _, _worker := range e.worker {
 				deadline := time.Now().Add(-timeToBecomeUnused)
-				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived, deadline)
-				if _worker.lastReceived.Before(deadline) {
+				workerLog.Debug().Int("Worker id", _worker.id).Msgf("Checking if %v is before %v", _worker.lastReceived.Load(), deadline)
+				if _worker.lastReceived.Load().(time.Time).Before(deadline) {
 					workerLog.Info().Int("Worker id", _worker.id).Msg("killing")
 					_worker.interrupted.Store(true)
 					close(_worker.interrupter)
diff --git a/plc4go/spi/pool/dynamicExecutor_test.go b/plc4go/spi/pool/dynamicExecutor_test.go
index 531867a71f..f1c8e4b4b2 100644
--- a/plc4go/spi/pool/dynamicExecutor_test.go
+++ b/plc4go/spi/pool/dynamicExecutor_test.go
@@ -21,6 +21,7 @@ package pool
 
 import (
 	"github.com/stretchr/testify/assert"
+	"sync/atomic"
 	"testing"
 	"time"
 )
@@ -185,7 +186,13 @@ func Test_dynamicExecutor_String(t *testing.T) {
 			fields: fields{
 				executor: &executor{
 					worker: []*worker{
-						{},
+						{
+							lastReceived: func() atomic.Value {
+								value := atomic.Value{}
+								value.Store(time.Time{})
+								return value
+							}(),
+						},
 					},
 				},
 				maxNumberOfWorkers: 3,
diff --git a/plc4go/spi/pool/executor.go b/plc4go/spi/pool/executor.go
index 104c7feae6..654edf2973 100644
--- a/plc4go/spi/pool/executor.go
+++ b/plc4go/spi/pool/executor.go
@@ -30,14 +30,15 @@ import (
 
 //go:generate go run ../../tools/plc4xgenerator/gen.go -type=executor
 type executor struct {
-	running      bool
-	shutdown     bool
-	stateChange  sync.Mutex
+	running  bool
+	shutdown bool
+
 	worker       []*worker
 	queueDepth   int
 	workItems    chan workItem
 	traceWorkers bool
 
+	stateChange     sync.RWMutex
 	workerWaitGroup sync.WaitGroup
 
 	log zerolog.Logger `ignore:"true"`
@@ -126,5 +127,7 @@ func (e *executor) Close() error {
 }
 
 func (e *executor) IsRunning() bool {
+	e.stateChange.RLock()
+	defer e.stateChange.RUnlock()
 	return e.running && !e.shutdown
 }
diff --git a/plc4go/spi/pool/executor_test.go b/plc4go/spi/pool/executor_test.go
index 229a7d9797..8bd13d6b0a 100644
--- a/plc4go/spi/pool/executor_test.go
+++ b/plc4go/spi/pool/executor_test.go
@@ -435,11 +435,15 @@ func Test_executor_String(t *testing.T) {
 				shutdown: true,
 				worker: []*worker{
 					{
-						id:           1,
-						shutdown:     atomic.Bool{},
-						interrupted:  atomic.Bool{},
-						hasEnded:     atomic.Bool{},
-						lastReceived: time.Time{},
+						id:          1,
+						shutdown:    atomic.Bool{},
+						interrupted: atomic.Bool{},
+						hasEnded:    atomic.Bool{},
+						lastReceived: func() atomic.Value {
+							value := atomic.Value{}
+							value.Store(time.Time{})
+							return value
+						}(),
 					},
 				},
 				queueDepth:   2,
diff --git a/plc4go/spi/pool/worker.go b/plc4go/spi/pool/worker.go
index 15313dd6ee..0fa5ae597b 100644
--- a/plc4go/spi/pool/worker.go
+++ b/plc4go/spi/pool/worker.go
@@ -40,7 +40,7 @@ type worker struct {
 		getWorkerWaitGroup() *sync.WaitGroup
 	}
 	hasEnded     atomic.Bool
-	lastReceived time.Time `stringer:"true"`
+	lastReceived atomic.Value
 
 	log zerolog.Logger `ignore:"true"`
 }
@@ -50,7 +50,7 @@ func (w *worker) initialize() {
 	w.interrupted.Store(false)
 	w.interrupter = make(chan struct{}, 1)
 	w.hasEnded.Store(false)
-	w.lastReceived = time.Now()
+	w.lastReceived.Store(time.Now())
 }
 
 func (w *worker) work() {
@@ -77,7 +77,7 @@ func (w *worker) work() {
 		workerLog.Debug().Msg("Working")
 		select {
 		case _workItem := <-w.executor.getWorksItems():
-			w.lastReceived = time.Now()
+			w.lastReceived.Store(time.Now())
 			workerLog.Debug().Msgf("Got work item %v", _workItem)
 			if _workItem.completionFuture.cancelRequested.Load() || (w.shutdown.Load() && w.interrupted.Load()) {
 				workerLog.Debug().Msg("We need to stop")
diff --git a/plc4go/spi/pool/worker_plc4xgen.go b/plc4go/spi/pool/worker_plc4xgen.go
index 85a6f1767c..23e8d1689c 100644
--- a/plc4go/spi/pool/worker_plc4xgen.go
+++ b/plc4go/spi/pool/worker_plc4xgen.go
@@ -64,8 +64,23 @@ func (d *worker) SerializeWithWriteBuffer(ctx context.Context, writeBuffer utils
 		return err
 	}
 
-	if err := writeBuffer.WriteString("lastReceived", uint32(len(d.lastReceived.String())*8), "UTF-8", d.lastReceived.String()); err != nil {
-		return err
+	if d.lastReceived.Load() != nil {
+		if serializableField, ok := d.lastReceived.Load().(utils.Serializable); ok {
+			if err := writeBuffer.PushContext("lastReceived"); err != nil {
+				return err
+			}
+			if err := serializableField.SerializeWithWriteBuffer(ctx, writeBuffer); err != nil {
+				return err
+			}
+			if err := writeBuffer.PopContext("lastReceived"); err != nil {
+				return err
+			}
+		} else {
+			stringValue := fmt.Sprintf("%v", d.lastReceived.Load())
+			if err := writeBuffer.WriteString("lastReceived", uint32(len(stringValue)*8), "UTF-8", stringValue); err != nil {
+				return err
+			}
+		}
 	}
 	if err := writeBuffer.PopContext("worker"); err != nil {
 		return err
diff --git a/plc4go/spi/pool/worker_test.go b/plc4go/spi/pool/worker_test.go
index 446885028b..b7729f3e3d 100644
--- a/plc4go/spi/pool/worker_test.go
+++ b/plc4go/spi/pool/worker_test.go
@@ -24,7 +24,6 @@ import (
 	"testing"
 	"time"
 
-	"github.com/rs/zerolog"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -37,8 +36,6 @@ func Test_worker_initialize(t *testing.T) {
 			getWorksItems() chan workItem
 			getWorkerWaitGroup() *sync.WaitGroup
 		}
-		lastReceived time.Time
-		log          zerolog.Logger
 	}
 	tests := []struct {
 		name   string
@@ -51,11 +48,10 @@ func Test_worker_initialize(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			w := &worker{
-				id:           tt.fields.id,
-				interrupter:  tt.fields.interrupter,
-				executor:     tt.fields.executor,
-				lastReceived: tt.fields.lastReceived,
-				log:          tt.fields.log,
+				id:          tt.fields.id,
+				interrupter: tt.fields.interrupter,
+				executor:    tt.fields.executor,
+				log:         produceTestLogger(t),
 			}
 			w.initialize()
 		})
@@ -205,6 +201,7 @@ func Test_worker_work(t *testing.T) {
 				id:          tt.fields.id,
 				interrupter: make(chan struct{}, 1),
 				executor:    tt.fields.executor,
+				log:         produceTestLogger(t),
 			}
 			go w.work()
 			if tt.firstValidation != nil {
@@ -222,14 +219,16 @@ func Test_worker_work(t *testing.T) {
 				t.Logf("secondValidation after %v", tt.timeBeforeSecondValidation)
 				tt.secondValidation(t, w)
 			}
+
+			close(w.interrupter)
+			time.Sleep(50 * time.Millisecond) // TODO: replace with worker stop... (which in turn essures the worker is not running anymore)
 		})
 	}
 }
 
 func Test_worker_String(t *testing.T) {
 	type fields struct {
-		id           int
-		lastReceived time.Time
+		id int
 	}
 	tests := []struct {
 		name   string
@@ -249,9 +248,10 @@ func Test_worker_String(t *testing.T) {
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			w := &worker{
-				id:           tt.fields.id,
-				lastReceived: tt.fields.lastReceived,
+				id:  tt.fields.id,
+				log: produceTestLogger(t),
 			}
+			w.lastReceived.Store(time.Time{})
 			assert.Equalf(t, tt.want, w.String(), "String()")
 		})
 	}