You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 22:00:04 UTC

[plc4x] branch develop updated (5eca78479c -> 32c5531d2f)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 5eca78479c feat(plc4go): add receive timeout option
     new e6c897c97a feat(plc4go/spi): refined logging of Default Connection
     new 32c5531d2f fix(plc4go/spi): potential fix with request transaction manager producing race conditions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/spi/default/DefaultConnection.go            |  7 ++++--
 .../spi/transactions/RequestTransactionManager.go  | 10 ++++----
 .../transactions/RequestTransactionManager_test.go | 27 +++++++++-------------
 .../requestTransactionManager_plc4xgen.go          |  2 +-
 4 files changed, 22 insertions(+), 24 deletions(-)


[plc4x] 02/02: fix(plc4go/spi): potential fix with request transaction manager producing race conditions

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 32c5531d2f65bfe86759f3a8a7fb6a62cc4bc26e
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:59:57 2023 +0200

    fix(plc4go/spi): potential fix with request transaction manager producing race conditions
---
 .../spi/transactions/RequestTransactionManager.go  | 10 ++++----
 .../transactions/RequestTransactionManager_test.go | 27 +++++++++-------------
 .../requestTransactionManager_plc4xgen.go          |  2 +-
 3 files changed, 17 insertions(+), 22 deletions(-)

diff --git a/plc4go/spi/transactions/RequestTransactionManager.go b/plc4go/spi/transactions/RequestTransactionManager.go
index 55cef95f96..4d8105b7f3 100644
--- a/plc4go/spi/transactions/RequestTransactionManager.go
+++ b/plc4go/spi/transactions/RequestTransactionManager.go
@@ -24,10 +24,10 @@ import (
 	"context"
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/pool"
-	"github.com/rs/zerolog/log"
 	"io"
 	"runtime"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"github.com/apache/plc4x/plc4go/pkg/api/config"
@@ -44,7 +44,7 @@ func init() {
 		runtime.NumCPU(),
 		100,
 		options.WithExecutorOptionTracerWorkers(config.TraceTransactionManagerWorkers),
-		config.WithCustomLogger(log.With().Str("executorInstance", "shared logger").Logger()),
+		config.WithCustomLogger(zerolog.Nop()),
 	)
 	sharedExecutorInstance.Start()
 	runtime.SetFinalizer(sharedExecutorInstance, func(sharedExecutorInstance pool.Executor) {
@@ -117,7 +117,7 @@ type requestTransactionManager struct {
 
 	executor pool.Executor
 
-	shutdown bool // Indicates it this rtm is in shutdown
+	shutdown atomic.Bool // Indicates it this rtm is in shutdown
 
 	traceTransactionManagerTransactions bool // flag set to true if it should trace transactions
 
@@ -188,7 +188,7 @@ func (r *requestTransactionManager) StartTransaction() RequestTransaction {
 		transactionId:  currentTransactionId,
 		transactionLog: transactionLogger,
 	}
-	if r.shutdown {
+	if r.shutdown.Load() {
 		transaction.completed = true
 		transaction.completionFuture = &completedFuture{errors.New("request transaction manager in shutdown")}
 	}
@@ -239,7 +239,7 @@ func (r *requestTransactionManager) Close() error {
 
 func (r *requestTransactionManager) CloseGraceful(timeout time.Duration) error {
 	r.log.Debug().Msgf("closing with a timeout of %s", timeout)
-	r.shutdown = true
+	r.shutdown.Store(true)
 	if timeout > 0 {
 		timer := time.NewTimer(timeout)
 		defer utils.CleanupTimer(timer)
diff --git a/plc4go/spi/transactions/RequestTransactionManager_test.go b/plc4go/spi/transactions/RequestTransactionManager_test.go
index 4113411419..d858749a97 100644
--- a/plc4go/spi/transactions/RequestTransactionManager_test.go
+++ b/plc4go/spi/transactions/RequestTransactionManager_test.go
@@ -151,14 +151,14 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
-		name       string
-		fields     fields
-		setup      func(t *testing.T, fields *fields)
-		wantAssert func(t *testing.T, requestTransaction RequestTransaction) bool
+		name        string
+		fields      fields
+		setup       func(t *testing.T, fields *fields)
+		manipulator func(t *testing.T, manager *requestTransactionManager)
+		wantAssert  func(t *testing.T, requestTransaction RequestTransaction) bool
 	}{
 		{
 			name: "start one",
@@ -169,8 +169,8 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 		},
 		{
 			name: "start one in shutdown",
-			fields: fields{
-				shutdown: true,
+			manipulator: func(t *testing.T, manager *requestTransactionManager) {
+				manager.shutdown.Store(true)
 			},
 			wantAssert: func(t *testing.T, requestTransaction RequestTransaction) bool {
 				assert.True(t, requestTransaction.IsCompleted())
@@ -190,10 +190,12 @@ func Test_requestTransactionManager_StartTransaction(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, r)
+			}
 			if got := r.StartTransaction(); !assert.True(t, tt.wantAssert(t, got)) {
 				t.Errorf("StartTransaction() = %v", got)
 			}
@@ -448,7 +450,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
@@ -478,7 +479,6 @@ func Test_requestTransactionManager_Close(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
@@ -494,7 +494,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 		log                                 zerolog.Logger
 	}
@@ -558,7 +557,6 @@ func Test_requestTransactionManager_CloseGraceful(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 tt.fields.log,
 			}
@@ -574,7 +572,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 		currentTransactionId                int32
 		workLog                             list.List
 		executor                            pool.Executor
-		shutdown                            bool
 		traceTransactionManagerTransactions bool
 	}
 	tests := []struct {
@@ -598,7 +595,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 					return v
 				}(),
 				executor:                            pool.NewFixedSizeExecutor(1, 1),
-				shutdown:                            true,
 				traceTransactionManagerTransactions: true,
 			},
 			want: `
@@ -609,7 +605,7 @@ func Test_requestTransactionManager_String(t *testing.T) {
 ║║      ╚══════════════╝╚══════════╝       ║                                                                           ║
 ║╚═════════════════════════════════════════╝                                                                           ║
 ║╔═executor/executor═══════════════════════════════════════════════════════════════════════════════════════╗╔═shutdown╗║
-║║╔═running╗╔═shutdown╗                                                                                    ║║ b1 true ║║
+║║╔═running╗╔═shutdown╗                                                                                    ║║b0 false ║║
 ║║║b0 false║║b0 false ║                                                                                    ║╚═════════╝║
 ║║╚════════╝╚═════════╝                                                                                    ║           ║
 ║║╔═worker/value/worker═══════════════════════════════════════════════════════════════════════════════════╗║           ║
@@ -635,7 +631,6 @@ func Test_requestTransactionManager_String(t *testing.T) {
 				currentTransactionId:                tt.fields.currentTransactionId,
 				workLog:                             tt.fields.workLog,
 				executor:                            tt.fields.executor,
-				shutdown:                            tt.fields.shutdown,
 				traceTransactionManagerTransactions: tt.fields.traceTransactionManagerTransactions,
 				log:                                 testutils.ProduceTestingLogger(t),
 			}
diff --git a/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go b/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
index 9fe567096c..32afc0bf1c 100644
--- a/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
+++ b/plc4go/spi/transactions/requestTransactionManager_plc4xgen.go
@@ -98,7 +98,7 @@ func (d *requestTransactionManager) SerializeWithWriteBuffer(ctx context.Context
 		}
 	}
 
-	if err := writeBuffer.WriteBit("shutdown", d.shutdown); err != nil {
+	if err := writeBuffer.WriteBit("shutdown", d.shutdown.Load()); err != nil {
 		return err
 	}
 


[plc4x] 01/02: feat(plc4go/spi): refined logging of Default Connection

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit e6c897c97a59060cc7074f58ee58198e73b7ab52
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 23:59:10 2023 +0200

    feat(plc4go/spi): refined logging of Default Connection
---
 plc4go/spi/default/DefaultConnection.go | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/plc4go/spi/default/DefaultConnection.go b/plc4go/spi/default/DefaultConnection.go
index 3ff36e787f..3334de849b 100644
--- a/plc4go/spi/default/DefaultConnection.go
+++ b/plc4go/spi/default/DefaultConnection.go
@@ -288,8 +288,11 @@ func (d *defaultConnection) Close() <-chan plc4go.PlcConnectionCloseResult {
 	var err error
 	if transportInstance := d.GetTransportInstance(); transportInstance != nil {
 		d.log.Trace().Msg("closing transport instance")
-		err = transportInstance.Close()
-		d.log.Trace().Err(err).Msg("transport instance closed")
+		if err = transportInstance.Close(); err != nil {
+			d.log.Warn().Err(err).Msg("Error disconnecting transport instance")
+		} else {
+			d.log.Trace().Msg("transport instance closed")
+		}
 	}
 	d.SetConnected(false)
 	ch := make(chan plc4go.PlcConnectionCloseResult, 1)