You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 10:58:26 UTC
[plc4x] 02/02: test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 82d3246445556ba7be535563a874f907a64d00ee
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:58:17 2023 +0200
test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test
---
plc4go/internal/cbus/Browser.go | 4 ++++
plc4go/internal/cbus/Browser_test.go | 12 ++++++++++++
plc4go/internal/cbus/Connection_test.go | 8 ++++++++
plc4go/internal/cbus/Discoverer_test.go | 8 ++++++++
4 files changed, 32 insertions(+)
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 2c6a20d25b..e9b84afcd1 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -296,7 +296,10 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
}
readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
defer readCtxCancel()
+ readWg := sync.WaitGroup{}
+ readWg.Add(1)
go func() {
+ defer readWg.Done()
defer func() {
if err := recover(); err != nil {
m.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
@@ -386,5 +389,6 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
return nil, errors.Wrap(err, "error waiting for other offsets")
}
}
+ readWg.Wait()
return result, nil
}
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 5cf353d820..a4fdfb4324 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -148,7 +148,11 @@ func TestBrowser_BrowseQuery(t *testing.T) {
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
time.Sleep(200 * time.Millisecond)
t.Log("Dispatching 3 MMI segments")
transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -308,7 +312,11 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
time.Sleep(200 * time.Millisecond)
t.Log("Dispatching 3 MMI segments")
transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -578,7 +586,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
currentState.Store(DONE)
case DONE:
t.Log("Connection dance done")
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
time.Sleep(200 * time.Millisecond)
t.Log("Dispatching 3 MMI segments")
transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 7a51b25a2b..366d0b22b3 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1792,7 +1792,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
codec := NewMessageCodec(nil, _options...)
codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
}()
@@ -1816,7 +1820,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
codec := NewMessageCodec(nil, _options...)
written := make(chan struct{})
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
close(written)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 4d2a1a3269..5768787c5c 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -179,7 +179,11 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
setup: func(t *testing.T, fields *fields, args *args) {
listen, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
conn, err := listen.Accept()
if err != nil {
t.Error(err)
@@ -263,7 +267,11 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+ dispatchWg := sync.WaitGroup{}
+ dispatchWg.Add(1)
+ t.Cleanup(dispatchWg.Wait)
go func() {
+ defer dispatchWg.Done()
conn, err := listen.Accept()
if err != nil {
t.Log(err)