You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 10:58:26 UTC

[plc4x] 02/02: test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 82d3246445556ba7be535563a874f907a64d00ee
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:58:17 2023 +0200

    test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test
---
 plc4go/internal/cbus/Browser.go         |  4 ++++
 plc4go/internal/cbus/Browser_test.go    | 12 ++++++++++++
 plc4go/internal/cbus/Connection_test.go |  8 ++++++++
 plc4go/internal/cbus/Discoverer_test.go |  8 ++++++++
 4 files changed, 32 insertions(+)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 2c6a20d25b..e9b84afcd1 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -296,7 +296,10 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 	}
 	readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
 	defer readCtxCancel()
+	readWg := sync.WaitGroup{}
+	readWg.Add(1)
 	go func() {
+		defer readWg.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				m.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
@@ -386,5 +389,6 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 			return nil, errors.Wrap(err, "error waiting for other offsets")
 		}
 	}
+	readWg.Wait()
 	return result, nil
 }
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 5cf353d820..a4fdfb4324 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -148,7 +148,11 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -308,7 +312,11 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -578,7 +586,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 7a51b25a2b..366d0b22b3 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1792,7 +1792,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 				codec := NewMessageCodec(nil, _options...)
 				codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
 				codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
 					codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
 				}()
@@ -1816,7 +1820,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 				fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
 				codec := NewMessageCodec(nil, _options...)
 				written := make(chan struct{})
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
 					codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
 					close(written)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 4d2a1a3269..5768787c5c 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -179,7 +179,11 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 			setup: func(t *testing.T, fields *fields, args *args) {
 				listen, err := net.Listen("tcp", "127.0.0.1:0")
 				require.NoError(t, err)
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					conn, err := listen.Accept()
 					if err != nil {
 						t.Error(err)
@@ -263,7 +267,11 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
 					if err != nil {
 						t.Fatal(err)
 					}
+					dispatchWg := sync.WaitGroup{}
+					dispatchWg.Add(1)
+					t.Cleanup(dispatchWg.Wait)
 					go func() {
+						defer dispatchWg.Done()
 						conn, err := listen.Accept()
 						if err != nil {
 							t.Log(err)