You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 22:00:06 UTC

[plc4x] 02/02: fix(plc4go/spi): potential fix with request transaction manager producing race conditions

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 32c5531d2f65bfe86759f3a8a7fb6a62cc4bc26e
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:59:57 2023 +0200

    fix(plc4go/spi): potential fix with request transaction manager producing race conditions
---
 .../spi/transactions/RequestTransactionManager.go  | 10 ++++----
 .../transactions/RequestTransactionManager_test.go | 27 +++++++++-------------
 .../requestTransactionManager_plc4xgen.go          |  2 +-
 3 files changed, 17 insertions(+), 22 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 55cef95f96..4d8105b7f3 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -24,10 +24,10 @@ import (
 	"context"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
-	"github.com/rs/zerolog/log"
 	"io"
 	"runtime"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/apache/plc4x/plc4go/pkg/api/config"
@@ -44,7 +44,7 @@ func init() {
 		runtime.NumCPU(),
 		100,
 		options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
-		config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
+		config.WithCustomLogger(zerolog.Nop()),
 	)
 	sharedExecutorInstance.Start()
 	runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
@@ -117,7 +117,7 @@ type requestTransactionManager struct {
 
 	executor pool.Executor
 
-	shutdown bool // Indicates it this rtm is in shutdown
+	shutdown atomic.Bool // Indicates it this rtm is in shutdown
 
 	traceTransactionManagerTransactions bool // flag set to true if it should trace transactions
 
@@ -188,7 +188,7 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 		transactionId:  currentTransactionId,
 		transactionLog: transactionLogger,
 	}
-	if r.shutdown {
+	if r.shutdown.Load() {
 		transaction.completed = true
 		transaction.completionFuture = &completedFuture{errors.New("request transaction manager in shutdown")}
 	}
@@ -239,7 +239,7 @@ func (r *requestTransactionManager) Close() error {
 
 func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	r.log.Debug().Msgf("closing with a timeout of %s", timeout)
-	r.shutdown = true
+	r.shutdown.Store(true)
 	if timeout > 0 {
 		timer := time.NewTimer(timeout)
 		defer utils.CleanupTimer(timer)
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 4113411419..d858749a97 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -151,14 +151,14 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
-		name       string
-		fields     fields
-		setup      func(t *testing.T, fields *fields)
-		wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
+		name        string
+		fields      fields
+		setup       func(t *testing.T, fields *fields)
+		manipulator func(t *testing.T, manager *requestTransactionManager)
+		wantAssert  func(t *testing.T, requestTransaction RequestTransaction) bool
 	}{
 		{
 			name: "start one",
@@ -169,8 +169,8 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 		},
 		{
 			name: "start one in shutdown",
-			fields: fields{
-				shutdown: true,
+			manipulator: func(t *testing.T, manager *requestTransactionManager) {
+				manager.shutdown.Store(true)
 			},
 			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
 				assert.True(t, requestTransaction.IsCompleted())
@@ -190,10 +190,12 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, r)
+			}
 			if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
 				t.Errorf("StartTransaction() = %v", got)
 			}
@@ -448,7 +450,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
@@ -478,7 +479,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
@@ -494,7 +494,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 		log                                 zerolog.Logger
 	}
@@ -558,7 +557,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 tt.fields.log,
 			}
@@ -574,7 +572,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
@@ -598,7 +595,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 					return v
 				}(),
 				executor:                            pool.NewFixedSizeExecutor(1, 1),
-				shutdown:                            true,
 				traceTransactionManagerTransactions: true,
 			},
 			want: `
@@ -609,7 +605,7 @@ func Test_requestTransactionManager_String(t *testing.T) {
 ║║      ╚══════════════╝╚══════════╝       ║                                                                           ║
 ║╚═════════════════════════════════════════╝                                                                           ║
 ║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║ b1 true ║║
+║║╔═running╗╔═shutdown╗                                                                                    ║║b0 false ║║
 ║║║b0 false║║b0 false ║                                                                                    ║╚═════════╝║
 ║║╚════════╝╚═════════╝                                                                                    ║           ║
 ║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║           ║
@@ -635,7 +631,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
diff --git a/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go b/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
index 9fe567096c..32afc0bf1c 100644
--- a/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
+++ b/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
@@ -98,7 +98,7 @@ func (d *requestTransactionManager) SerializeWithWriteBuffer(ctx context.Context
 		}
 	}
 
-	if err := writeBuffer.WriteBit("shutdown", d.shutdown); err != nil {
+	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
 		return err
 	}