You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/02 08:19:10 UTC

[plc4x] branch develop updated: test(plc4go/cbus): fix concurrency issue in test.

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new 80a0d8ef1b test(plc4go/cbus): fix concurrency issue in test.
80a0d8ef1b is described below

commit 80a0d8ef1b6e83b10fac637d87f87aec91ebe088
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 2 10:18:52 2023 +0200

    test(plc4go/cbus): fix concurrency issue in test.
    
    Transition away from using WaitGroup on addResponseCode or addPlcValue and hook a chan close into transaction end with EndTransaction or FailTransaction as this is really the marker which notes if the transaction is ended (what we looking for at the end). This has two benefits: 1. No more flaky tests and 2. if we forget to properly end a transaction we will notice and need to fix that in prod code.
---
 plc4go/internal/cbus/Reader.go      |   5 +-
 plc4go/internal/cbus/Reader_test.go | 146 +++++++++++++++++-------------------
 2 files changed, 73 insertions(+), 78 deletions(-)

diff --git a/plc4go/internal/cbus/Reader.go b/plc4go/internal/cbus/Reader.go
index 0b3cedc4ff..2382a20e1b 100644
--- a/plc4go/internal/cbus/Reader.go
+++ b/plc4go/internal/cbus/Reader.go
@@ -24,6 +24,7 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/options"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/rs/zerolog"
+	"github.com/rs/zerolog/log"
 	"sync"
 	"time"
 
@@ -200,7 +201,9 @@ func (m *Reader) sendMessageOverTheWire(ctx context.Context, transaction transac
 		// TODO: check if we can use a plcValueSerializer
 		encodedReply := embeddedReply.GetReply().(readWriteModel.ReplyEncodedReply).GetEncodedReply()
 		if err := MapEncodedReply(m.log, transaction, encodedReply, tagName, addResponseCode, addPlcValue); err != nil {
-			return errors.Wrap(err, "error encoding reply")
+			log.Error().Err(err).Msg("error encoding reply")
+			addResponseCode(tagName, apiModel.PlcResponseCode_INTERNAL_ERROR)
+			return transaction.EndRequest()
 		}
 		return transaction.EndRequest()
 	}, func(err error) error {
diff --git a/plc4go/internal/cbus/Reader_test.go b/plc4go/internal/cbus/Reader_test.go
index bb93973550..b5baf829e0 100644
--- a/plc4go/internal/cbus/Reader_test.go
+++ b/plc4go/internal/cbus/Reader_test.go
@@ -29,12 +29,12 @@ import (
 	"github.com/apache/plc4x/plc4go/spi/testutils"
 	"github.com/apache/plc4x/plc4go/spi/transactions"
 	"github.com/apache/plc4x/plc4go/spi/transports/test"
+	"github.com/apache/plc4x/plc4go/spi/utils"
 	"github.com/pkg/errors"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"net/url"
 	"strings"
-	"sync"
 	"sync/atomic"
 	"testing"
 	"time"
@@ -405,16 +405,15 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 		ctx             context.Context
 		transaction     transactions.RequestTransaction
 		messageToSend   readWriteModel.CBusMessage
-		addResponseCode func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode)
+		addResponseCode func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode)
 		tagName         string
-		addPlcValue     func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue)
+		addPlcValue     func(t *testing.T) func(name string, plcValue apiValues.PlcValue)
 	}
 	tests := []struct {
 		name   string
 		fields fields
 		args   args
-		setup  func(t *testing.T, fields *fields, args *args)
-		wg     *sync.WaitGroup
+		setup  func(t *testing.T, fields *fields, args *args, ch chan struct{})
 	}{
 		{
 			name: "Send message empty message",
@@ -428,23 +427,21 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					return timeout
 				}(),
 				messageToSend: nil,
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_INTERNAL_ERROR, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				loggerOption := options.WithCustomLogger(testutils.ProduceTestingLogger(t))
@@ -471,10 +468,11 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.FailRequest(mock.Anything).Return(errors.New("no I say"))
+				expect.FailRequest(mock.Anything).Return(errors.New("no I say")).Run(func(_ error) {
+					close(ch)
+				})
 				args.transaction = transaction
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with message to client",
@@ -543,31 +541,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_REQUEST_TIMEOUT, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.FailRequest(mock.Anything).Return(errors.New("Nope"))
+				expect.FailRequest(mock.Anything).Return(errors.New("Nope")).Run(func(_ error) {
+					close(ch)
+				})
 				args.transaction = transaction
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with server error",
@@ -636,31 +633,30 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				testutils.SetToTestingLogger(t, readWriteModel.Plc4xModelLog)
 
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with too many retransmissions",
@@ -692,26 +688,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_ERROR, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -759,7 +755,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with corruption",
@@ -791,26 +786,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -858,7 +853,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with sync loss",
@@ -890,26 +884,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_REMOTE_BUSY, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -957,7 +951,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with too long",
@@ -989,26 +982,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_INVALID_DATA, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -1056,7 +1049,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with confirm only",
@@ -1088,26 +1080,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_NOT_FOUND, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -1155,7 +1147,6 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: &sync.WaitGroup{},
 		},
 		{
 			name: "Send message which responds with ok",
@@ -1187,26 +1178,26 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 					nil,
 					nil,
 				),
-				addResponseCode: func(t *testing.T, wg *sync.WaitGroup) func(name string, responseCode apiModel.PlcResponseCode) {
+				addResponseCode: func(t *testing.T) func(name string, responseCode apiModel.PlcResponseCode) {
 					return func(name string, responseCode apiModel.PlcResponseCode) {
 						t.Logf("Got response code %s for %s", responseCode, name)
 						assert.Equal(t, "horst", name)
 						assert.Equal(t, apiModel.PlcResponseCode_OK, responseCode)
-						wg.Done()
 					}
 				},
 				tagName: "horst",
-				addPlcValue: func(t *testing.T, wg *sync.WaitGroup) func(name string, plcValue apiValues.PlcValue) {
+				addPlcValue: func(t *testing.T) func(name string, plcValue apiValues.PlcValue) {
 					return func(name string, plcValue apiValues.PlcValue) {
 						t.Logf("Got response %s for %s", plcValue, name)
-						wg.Done()
 					}
 				},
 			},
-			setup: func(t *testing.T, fields *fields, args *args) {
+			setup: func(t *testing.T, fields *fields, args *args, ch chan struct{}) {
 				transaction := NewMockRequestTransaction(t)
 				expect := transaction.EXPECT()
-				expect.EndRequest().Return(nil)
+				expect.EndRequest().Return(nil).Run(func() {
+					close(ch)
+				})
 				args.transaction = transaction
 
 				// Setup logger
@@ -1254,28 +1245,29 @@ func TestReader_sendMessageOverTheWire(t *testing.T) {
 				}
 				fields.messageCodec = codec
 			},
-			wg: func() *sync.WaitGroup {
-				wg := &sync.WaitGroup{}
-				wg.Add(1) // We getting an response and a value
-				return wg
-			}(),
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
+			ch := make(chan struct{})
 			if tt.setup != nil {
-				tt.setup(t, &tt.fields, &tt.args)
+				tt.setup(t, &tt.fields, &tt.args, ch)
 			}
 			m := &Reader{
 				alphaGenerator: tt.fields.alphaGenerator,
 				messageCodec:   tt.fields.messageCodec,
 				tm:             tt.fields.tm,
 			}
-			tt.wg.Add(1)
-			m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t, tt.wg), tt.args.tagName, tt.args.addPlcValue(t, tt.wg))
+			m.sendMessageOverTheWire(tt.args.ctx, tt.args.transaction, tt.args.messageToSend, tt.args.addResponseCode(t), tt.args.tagName, tt.args.addPlcValue(t))
 			t.Log("Waiting now")
-			tt.wg.Wait() // TODO: we need to timeout this too
-			t.Log("Done waiting")
+			timer := time.NewTimer(3 * time.Second)
+			defer utils.CleanupTimer(timer)
+			select {
+			case <-ch:
+				t.Log("Done waiting")
+			case <-timer.C:
+				t.Error("Timeout")
+			}
 		})
 	}
 }