You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 12:10:05 UTC

[plc4x] branch develop updated: test(plc4go/cbus): fix concurrency issue with connection test

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 06c79b35c9 test(plc4go/cbus): fix concurrency issue with connection test
06c79b35c9 is described below

commit 06c79b35c9fde10e04b7d89fac7f76fcebbbee04
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 14:09:57 2023 +0200

    test(plc4go/cbus): fix concurrency issue with connection test
---
 plc4go/internal/cbus/Connection.go      |  99 +++++++++---------
 plc4go/internal/cbus/Connection_test.go | 178 ++++++++++++++++++++++++--------
 plc4go/internal/cbus/DriverContext.go   |   5 +-
 3 files changed, 192 insertions(+), 90 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 154a2b299a..5b4f767815 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -310,64 +310,71 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 
 	receivedResetEchoChan := make(chan bool, 1)
 	receivedResetEchoErrorChan := make(chan error, 1)
-	if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
-		switch message := message.(type) {
-		case readWriteModel.CBusMessageToClientExactly:
-			switch reply := message.GetReply().(type) {
-			case readWriteModel.ReplyOrConfirmationReplyExactly:
-				switch reply.GetReply().(type) {
-				case readWriteModel.PowerUpReplyExactly:
-					c.log.Debug().Msg("Received a PUN reply")
-					return true
+	if err := c.messageCodec.SendRequest(
+		ctx,
+		cBusMessage,
+		func(message spi.Message) bool {
+			c.log.Trace().Msg("Checking message")
+			switch message := message.(type) {
+			case readWriteModel.CBusMessageToClientExactly:
+				switch reply := message.GetReply().(type) {
+				case readWriteModel.ReplyOrConfirmationReplyExactly:
+					switch reply.GetReply().(type) {
+					case readWriteModel.PowerUpReplyExactly:
+						c.log.Debug().Msg("Received a PUN reply")
+						return true
+					default:
+						c.log.Trace().Msgf("%T not relevant", reply)
+						return false
+					}
 				default:
 					c.log.Trace().Msgf("%T not relevant", reply)
 					return false
 				}
+			case readWriteModel.CBusMessageToServerExactly:
+				switch request := message.GetRequest().(type) {
+				case readWriteModel.RequestResetExactly:
+					c.log.Debug().Msg("Received a Reset reply")
+					return true
+				default:
+					c.log.Trace().Msgf("%T not relevant", request)
+					return false
+				}
 			default:
-				c.log.Trace().Msgf("%T not relevant", reply)
-				return false
-			}
-		case readWriteModel.CBusMessageToServerExactly:
-			switch request := message.GetRequest().(type) {
-			case readWriteModel.RequestResetExactly:
-				c.log.Debug().Msg("Received a Reset reply")
-				return true
-			default:
-				c.log.Trace().Msgf("%T not relevant", request)
+				c.log.Trace().Msgf("%T not relevant", message)
 				return false
 			}
-		default:
-			c.log.Trace().Msgf("%T not relevant", message)
-			return false
-		}
-	}, func(message spi.Message) error {
-		switch message.(type) {
-		case readWriteModel.CBusMessageToClientExactly:
-			// This is the powerup notification
-			select {
-			case receivedResetEchoChan <- false:
-				c.log.Trace().Msg("notified reset chan from message to client")
+		},
+		func(message spi.Message) error {
+			c.log.Trace().Msg("Handling message")
+			switch message.(type) {
+			case readWriteModel.CBusMessageToClientExactly:
+				// This is the powerup notification
+				select {
+				case receivedResetEchoChan <- false:
+					c.log.Trace().Msg("notified reset chan from message to client")
+				default:
+				}
+			case readWriteModel.CBusMessageToServerExactly:
+				// This is the echo
+				select {
+				case receivedResetEchoChan <- true:
+					c.log.Trace().Msg("notified reset chan from message to server")
+				default:
+				}
 			default:
+				return errors.Errorf("Unmapped type %T", message)
 			}
-		case readWriteModel.CBusMessageToServerExactly:
-			// This is the echo
+			return nil
+		},
+		func(err error) error {
 			select {
-			case receivedResetEchoChan <- true:
-				c.log.Trace().Msg("notified reset chan from message to server")
+			case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
+				c.log.Trace().Msg("notified error chan")
 			default:
 			}
-		default:
-			return errors.Errorf("Unmapped type %T", message)
-		}
-		return nil
-	}, func(err error) error {
-		select {
-		case receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request"):
-			c.log.Trace().Msg("notified error chan")
-		default:
-		}
-		return nil
-	}, c.GetTtl()); err != nil {
+			return nil
+		}, c.GetTtl()); err != nil {
 		if sendOutErrorNotification {
 			c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
 		} else {
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 366d0b22b3..073904751b 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -21,6 +21,7 @@ package cbus
 
 import (
 	"context"
+	"encoding/hex"
 	"github.com/stretchr/testify/require"
 	"net/url"
 	"sync"
@@ -1379,27 +1380,35 @@ func TestConnection_setInterfaceOptions3(t *testing.T) {
 
 func TestConnection_setupConnection(t *testing.T) {
 	type fields struct {
-		DefaultConnection _default.DefaultConnection
-		messageCodec      *MessageCodec
-		subscribers       []*Subscriber
-		tm                transactions.RequestTransactionManager
-		configuration     Configuration
-		driverContext     DriverContext
-		connectionId      string
-		tracer            tracer.Tracer
+		_DefaultConnection _default.DefaultConnection
+		messageCodec       *MessageCodec
+		subscribers        []*Subscriber
+		tm                 transactions.RequestTransactionManager
+		configuration      Configuration
+		driverContext      DriverContext
+		connectionId       string
+		tracer             tracer.Tracer
 	}
 	type args struct {
 		ctx context.Context
 		ch  chan plc4go.PlcConnectionConnectResult
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		args   args
-		setup  func(t *testing.T, fields *fields)
+		name        string
+		fields      fields
+		args        args
+		setup       func(t *testing.T, fields *fields)
+		manipulator func(t *testing.T, connection *Connection)
+		validator   func(t *testing.T, result plc4go.PlcConnectionConnectResult)
 	}{
 		{
 			name: "setup connection (failing)",
+			fields: fields{
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
+			},
 			args: args{
 				ctx: testutils.TestContext(t),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1407,20 +1416,33 @@ func TestConnection_setupConnection(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				// Setup connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
 				transport := test.NewTransport(_options...)
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
 				require.NoError(t, err)
 				codec := NewMessageCodec(ti, _options...)
+				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
 					assert.Error(t, codec.Disconnect())
 				})
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.Error(t, result.GetErr())
+			},
 		},
 		{
 			name: "setup connection (failing after reset)",
+			fields: fields{
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
+			},
 			args: args{
 				ctx: testutils.TestContext(t),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1428,9 +1450,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				// Build the default connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
 				// Build the message codec
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
@@ -1464,19 +1483,29 @@ func TestConnection_setupConnection(t *testing.T) {
 
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.Error(t, result.GetErr())
+			},
 		},
 		{
 			name: "setup connection (failing after app filters)",
+			fields: fields{
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
+			},
 			args: args{
 				ctx: testutils.TestContext(t),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
 			},
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
-
-				// Build the default connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
 				// Build the message codec
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
@@ -1522,9 +1551,23 @@ func TestConnection_setupConnection(t *testing.T) {
 
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.Error(t, result.GetErr())
+			},
 		},
 		{
 			name: "setup connection (failing after interface options 3",
+			fields: fields{
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
+			},
 			args: args{
 				ctx: testutils.TestContext(t),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1532,9 +1575,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				// Build the default connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
 				// Build the message codec
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
@@ -1552,6 +1592,7 @@ func TestConnection_setupConnection(t *testing.T) {
 				currentState.Store(RESET)
 				stateChangeMutex := sync.Mutex{}
 				ti.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					t.Logf("Reacting to\n%s", hex.Dump(data))
 					stateChangeMutex.Lock()
 					defer stateChangeMutex.Unlock()
 					switch currentState.Load().(MockState) {
@@ -1586,9 +1627,23 @@ func TestConnection_setupConnection(t *testing.T) {
 
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.Error(t, result.GetErr())
+			},
 		},
 		{
 			name: "setup connection (failing after interface options 1 pun)",
+			fields: fields{
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
+			},
 			args: args{
 				ctx: testutils.TestContext(t),
 				ch:  make(chan plc4go.PlcConnectionConnectResult, 1),
@@ -1596,9 +1651,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				// Build the default connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
 				// Build the message codec
 				transport := test.NewTransport(_options...)
 				transportUrl := url.URL{Scheme: "test"}
@@ -1656,11 +1708,22 @@ func TestConnection_setupConnection(t *testing.T) {
 
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.Error(t, result.GetErr())
+			},
 		},
 		{
 			name: "setup connection",
 			fields: fields{
-				DefaultConnection: _default.NewDefaultConnection(nil),
+				driverContext: DriverContext{
+					awaitSetupComplete:      true,
+					awaitDisconnectComplete: true,
+				},
 			},
 			args: args{
 				ctx: testutils.TestContext(t),
@@ -1669,9 +1732,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			setup: func(t *testing.T, fields *fields) {
 				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
 
-				// Build the default connection
-				fields.DefaultConnection = _default.NewDefaultConnection(nil, _options...)
-
 				// Build the message codec
 				transport := test.NewTransport(_options...)
 				ti, err := transport.CreateTransportInstance(url.URL{Scheme: "test"}, nil, _options...)
@@ -1730,10 +1790,19 @@ func TestConnection_setupConnection(t *testing.T) {
 				codec := NewMessageCodec(ti, _options...)
 				require.NoError(t, codec.Connect())
 				t.Cleanup(func() {
-					assert.NoError(t, codec.Disconnect())
+					assert.Error(t, codec.Disconnect())
 				})
 				fields.messageCodec = codec
 			},
+			manipulator: func(t *testing.T, connection *Connection) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+				connection.DefaultConnection = _default.NewDefaultConnection(connection, _options...)
+			},
+			validator: func(t *testing.T, result plc4go.PlcConnectionConnectResult) {
+				assert.NotNil(t, result)
+				assert.NoError(t, result.GetErr())
+				assert.NotNil(t, result.GetConnection())
+			},
 		},
 	}
 	for _, tt := range tests {
@@ -1742,20 +1811,43 @@ func TestConnection_setupConnection(t *testing.T) {
 				tt.setup(t, &tt.fields)
 			}
 			c := &Connection{
-				DefaultConnection: tt.fields.DefaultConnection,
-				messageCodec:      tt.fields.messageCodec,
-				subscribers:       tt.fields.subscribers,
-				tm:                tt.fields.tm,
-				configuration:     tt.fields.configuration,
-				driverContext:     tt.fields.driverContext,
-				connectionId:      tt.fields.connectionId,
-				tracer:            tt.fields.tracer,
-				log:               testutils.ProduceTestingLogger(t),
+				messageCodec:  tt.fields.messageCodec,
+				subscribers:   tt.fields.subscribers,
+				tm:            tt.fields.tm,
+				configuration: tt.fields.configuration,
+				driverContext: tt.fields.driverContext,
+				connectionId:  tt.fields.connectionId,
+				tracer:        tt.fields.tracer,
+				log:           testutils.ProduceTestingLogger(t),
+			}
+			if tt.manipulator != nil {
+				tt.manipulator(t, c)
 			}
 			c.setupConnection(tt.args.ctx, tt.args.ch)
+			assert.NotNil(t, tt.args.ch, "We always need a result channel")
+			chanTimeout := time.NewTimer(10 * time.Second)
+			t.Cleanup(func() {
+				utils.CleanupTimer(chanTimeout)
+			})
+			select {
+			case <-chanTimeout.C:
+				t.Fatal("setup connection doesn't fill chan in time")
+			case result := <-tt.args.ch:
+				if tt.validator != nil {
+					tt.validator(t, result)
+				}
+			}
 			// To shut down properly we always do that
-			c.SetConnected(false)
-			c.handlerWaitGroup.Wait()
+			closeTimeout := time.NewTimer(10 * time.Second)
+			t.Cleanup(func() {
+				utils.CleanupTimer(closeTimeout)
+			})
+			select {
+			case <-closeTimeout.C:
+				t.Fatal("close didn't react in time")
+			case <-c.Close():
+				t.Log("connection closed")
+			}
 		})
 	}
 }
diff --git a/plc4go/internal/cbus/DriverContext.go b/plc4go/internal/cbus/DriverContext.go
index a53f2fa8b5..041f4214e2 100644
--- a/plc4go/internal/cbus/DriverContext.go
+++ b/plc4go/internal/cbus/DriverContext.go
@@ -26,5 +26,8 @@ type DriverContext struct {
 }
 
 func NewDriverContext(_ Configuration) DriverContext {
-	return DriverContext{}
+	return DriverContext{
+		awaitSetupComplete:      true,
+		awaitDisconnectComplete: true,
+	}
 }