You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 10:58:24 UTC

[plc4x] branch develop updated (708fb98772 -> 82d3246445)

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


    from 708fb98772 test(plc4go/cbus): fix race condition with shutdown
     new 9036acbbd8 refactor(plc4go/cbus): split up browser code
     new 82d3246445 test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 plc4go/internal/cbus/Browser.go         | 163 +++++++++++++-------------
 plc4go/internal/cbus/Browser_test.go    | 201 ++++++++++++++++++++++++++++++--
 plc4go/internal/cbus/Connection.go      |   4 +-
 plc4go/internal/cbus/Connection_test.go |  11 +-
 plc4go/internal/cbus/Discoverer_test.go |   8 ++
 5 files changed, 292 insertions(+), 95 deletions(-)


[plc4x] 02/02: test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 82d3246445556ba7be535563a874f907a64d00ee
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:58:17 2023 +0200

    test(plc4go/cbus): ensure we don't have dangling goroutines before ending the test
---
 plc4go/internal/cbus/Browser.go         |  4 ++++
 plc4go/internal/cbus/Browser_test.go    | 12 ++++++++++++
 plc4go/internal/cbus/Connection_test.go |  8 ++++++++
 plc4go/internal/cbus/Discoverer_test.go |  8 ++++++++
 4 files changed, 32 insertions(+)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 2c6a20d25b..e9b84afcd1 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -296,7 +296,10 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 	}
 	readCtx, readCtxCancel := context.WithTimeout(ctx, time.Second*2)
 	defer readCtxCancel()
+	readWg := sync.WaitGroup{}
+	readWg.Add(1)
 	go func() {
+		defer readWg.Done()
 		defer func() {
 			if err := recover(); err != nil {
 				m.log.Error().Msgf("panic-ed %v. Stack:\n%s", err, debug.Stack())
@@ -386,5 +389,6 @@ func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]an
 			return nil, errors.Wrap(err, "error waiting for other offsets")
 		}
 	}
+	readWg.Wait()
 	return result, nil
 }
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 5cf353d820..a4fdfb4324 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -148,7 +148,11 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -308,7 +312,11 @@ func TestBrowser_browseUnitInfo(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
@@ -578,7 +586,11 @@ func TestBrowser_getInstalledUnitAddressBytes(t *testing.T) {
 						currentState.Store(DONE)
 					case DONE:
 						t.Log("Connection dance done")
+						dispatchWg := sync.WaitGroup{}
+						dispatchWg.Add(1)
+						t.Cleanup(dispatchWg.Wait)
 						go func() {
+							defer dispatchWg.Done()
 							time.Sleep(200 * time.Millisecond)
 							t.Log("Dispatching 3 MMI segments")
 							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index 7a51b25a2b..366d0b22b3 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -1792,7 +1792,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 				codec := NewMessageCodec(nil, _options...)
 				codec.monitoredMMIs = make(chan readWriteModel.CALReply, 1)
 				codec.monitoredSALs = make(chan readWriteModel.MonitoredSAL, 1)
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
 					codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
 				}()
@@ -1816,7 +1820,11 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 				fields.subscribers = []*Subscriber{NewSubscriber(nil, options.WithCustomLogger(testutils.ProduceTestingLogger(t)))}
 				codec := NewMessageCodec(nil, _options...)
 				written := make(chan struct{})
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					codec.monitoredMMIs <- readWriteModel.NewCALReplyShort(0, nil, nil, nil)
 					codec.monitoredSALs <- readWriteModel.NewMonitoredSAL(0, nil)
 					close(written)
diff --git a/plc4go/internal/cbus/Discoverer_test.go b/plc4go/internal/cbus/Discoverer_test.go
index 4d2a1a3269..5768787c5c 100644
--- a/plc4go/internal/cbus/Discoverer_test.go
+++ b/plc4go/internal/cbus/Discoverer_test.go
@@ -179,7 +179,11 @@ func TestDiscoverer_createDeviceScanDispatcher(t *testing.T) {
 			setup: func(t *testing.T, fields *fields, args *args) {
 				listen, err := net.Listen("tcp", "127.0.0.1:0")
 				require.NoError(t, err)
+				dispatchWg := sync.WaitGroup{}
+				dispatchWg.Add(1)
+				t.Cleanup(dispatchWg.Wait)
 				go func() {
+					defer dispatchWg.Done()
 					conn, err := listen.Accept()
 					if err != nil {
 						t.Error(err)
@@ -263,7 +267,11 @@ func TestDiscoverer_createTransportInstanceDispatcher(t *testing.T) {
 					if err != nil {
 						t.Fatal(err)
 					}
+					dispatchWg := sync.WaitGroup{}
+					dispatchWg.Add(1)
+					t.Cleanup(dispatchWg.Wait)
 					go func() {
+						defer dispatchWg.Done()
 						conn, err := listen.Accept()
 						if err != nil {
 							t.Log(err)


[plc4x] 01/02: refactor(plc4go/cbus): split up browser code

Posted by sr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 9036acbbd874341d43665a75aa4c7fec698cb936
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:52:25 2023 +0200

    refactor(plc4go/cbus): split up browser code
---
 plc4go/internal/cbus/Browser.go         | 159 ++++++++++++++-------------
 plc4go/internal/cbus/Browser_test.go    | 189 ++++++++++++++++++++++++++++++--
 plc4go/internal/cbus/Connection.go      |   4 +-
 plc4go/internal/cbus/Connection_test.go |   3 -
 4 files changed, 260 insertions(+), 95 deletions(-)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 21a26b62df..2c6a20d25b 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"runtime/debug"
+	"sync"
 	"time"
 
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
@@ -46,107 +47,109 @@ type Browser struct {
 }
 
 func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption) *Browser {
-	browser := Browser{
+	browser := &Browser{
 		connection:      connection,
 		sequenceCounter: 0,
 
 		log: options.ExtractCustomLogger(_options...),
 	}
 	browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
-	return &browser
+	return browser
 }
 
-func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (apiModel.PlcResponseCode, []apiModel.PlcBrowseItem) {
-	var queryResults []apiModel.PlcBrowseItem
+func (m *Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
 	switch query := query.(type) {
 	case *unitInfoQuery:
-		m.log.Trace().Msg("extract units")
-		units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
-		if err != nil {
-			m.log.Error().Err(err).Msg("Error extracting units")
-			return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
-		}
-		attributes, allAttributes := m.extractAttributes(query)
+		return m.browseUnitInfo(ctx, interceptor, queryName, query)
+	default:
+		m.log.Warn().Msgf("unsupported query type supplied %T", query)
+		return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+	}
+}
 
+func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query *unitInfoQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
+	m.log.Trace().Msg("extract units")
+	units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
+	if err != nil {
+		m.log.Error().Err(err).Msg("Error extracting units")
+		return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+	}
+	attributes, allAttributes := m.extractAttributes(query)
+
+	if allUnits {
+		m.log.Info().Msg("Querying all (available) units")
+	}
+unitLoop:
+	for _, unit := range units {
+		m.log.Trace().Msgf("checking unit:\n%s", unit)
+		if err := ctx.Err(); err != nil {
+			m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
+			return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+		}
+		unitAddress := unit.GetAddress()
+		if !allUnits && allAttributes {
+			m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
+		}
+		event := m.log.Info()
 		if allUnits {
-			m.log.Info().Msg("Querying all (available) units")
+			event = m.log.Debug()
 		}
-	unitLoop:
-		for _, unit := range units {
-			m.log.Trace().Msgf("checking unit:\n%s", unit)
+		event.Msgf("Query unit  %d", unitAddress)
+		for _, attribute := range attributes {
 			if err := ctx.Err(); err != nil {
 				m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
 				return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
 			}
-			unitAddress := unit.GetAddress()
-			if !allUnits && allAttributes {
-				m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
-			}
-			event := m.log.Info()
-			if allUnits {
-				event = m.log.Debug()
+			if !allUnits && !allAttributes {
+				m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
+			} else {
+				event.Msgf("unit %d: Query %s", unitAddress, attribute)
 			}
-			event.Msgf("Query unit  %d", unitAddress)
-			for _, attribute := range attributes {
-				if err := ctx.Err(); err != nil {
-					m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
-					return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
-				}
-				if !allUnits && !allAttributes {
-					m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
-				} else {
-					event.Msgf("unit %d: Query %s", unitAddress, attribute)
-				}
-				m.log.Trace().Msg("Building request")
-				readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
-				readRequest, _ := m.connection.ReadRequestBuilder().
-					AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
-					Build()
-				timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
-				m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
-				requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
-				m.log.Trace().Msg("got a response")
-				timeoutCancel()
-				if err := requestResult.GetErr(); err != nil {
-					if allUnits || allAttributes {
-						event = m.log.Trace()
-					}
-					event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
-					continue unitLoop
-				}
-				response := requestResult.GetResponse()
-				if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
-					event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
-					continue unitLoop
+			m.log.Trace().Msg("Building request")
+			readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
+			readRequest, _ := m.connection.ReadRequestBuilder().
+				AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
+				Build()
+			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+			m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
+			requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
+			m.log.Trace().Msg("got a response")
+			timeoutCancel()
+			if err := requestResult.GetErr(); err != nil {
+				if allUnits || allAttributes {
+					event = m.log.Trace()
 				}
-				queryResult := spiModel.NewDefaultPlcBrowseItem(
-					NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
-					queryName,
-					"",
-					true,
-					false,
-					false,
-					nil,
-					map[string]values.PlcValue{
-						"CurrentValue": response.GetValue(readTagName),
-					},
-				)
-				if interceptor != nil {
-					m.log.Trace().Msg("forwarding query result to interceptor")
-					interceptor(queryResult)
-				}
-				queryResults = append(queryResults, queryResult)
+				event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
+				continue unitLoop
+			}
+			response := requestResult.GetResponse()
+			if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
+				event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
+				continue unitLoop
+			}
+			queryResult := spiModel.NewDefaultPlcBrowseItem(
+				NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
+				queryName,
+				"",
+				true,
+				false,
+				false,
+				nil,
+				map[string]values.PlcValue{
+					"CurrentValue": response.GetValue(readTagName),
+				},
+			)
+			if interceptor != nil {
+				m.log.Trace().Msg("forwarding query result to interceptor")
+				interceptor(queryResult)
 			}
+			queryResults = append(queryResults, queryResult)
 		}
-	default:
-		m.log.Warn().Msgf("unsupported query type supplied %T", query)
-		return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
 	}
-	m.log.Trace().Msgf("Browse done with \n%s", queryResults)
 	return apiModel.PlcResponseCode_OK, queryResults
 }
 
-func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
+func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
 	if unitAddress := query.unitAddress; unitAddress != nil {
 		return []readWriteModel.UnitAddress{unitAddress}, false, nil
 	} else {
@@ -167,7 +170,7 @@ func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInst
 	}
 }
 
-func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
+func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
 	if attribute := query.attribute; attribute != nil {
 		return []readWriteModel.Attribute{*attribute}, false
 	} else {
@@ -179,7 +182,7 @@ func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attri
 	}
 }
 
-func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
+func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
 	start := time.Now()
 	defer func() {
 		m.log.Debug().Msgf("Ending unit address acquiring after %s", time.Since(start))
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 7794f16574..5cf353d820 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,6 +22,7 @@ package cbus
 import (
 	"context"
 	"fmt"
+	"github.com/rs/zerolog"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -62,16 +63,16 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 		query       apiModel.PlcQuery
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		args   args
-		setup  func(t *testing.T, fields *fields)
-		want   apiModel.PlcResponseCode
-		want1  []apiModel.PlcBrowseItem
+		name             string
+		fields           fields
+		args             args
+		setup            func(t *testing.T, fields *fields)
+		wantResponseCode apiModel.PlcResponseCode
+		wantQueryResults []apiModel.PlcBrowseItem
 	}{
 		{
-			name: "invalid address",
-			want: apiModel.PlcResponseCode_INVALID_ADDRESS,
+			name:             "invalid address",
+			wantResponseCode: apiModel.PlcResponseCode_INVALID_ADDRESS,
 		},
 		{
 			name: "non responding browse",
@@ -177,8 +178,8 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 					}
 				})
 			},
-			want: apiModel.PlcResponseCode_OK,
-			want1: []apiModel.PlcBrowseItem{
+			wantResponseCode: apiModel.PlcResponseCode_OK,
+			wantQueryResults: []apiModel.PlcBrowseItem{
 				&spiModel.DefaultPlcBrowseItem{
 					Tag:      NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
 					Name:     "testQuery",
@@ -202,8 +203,172 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 				log:             testutils.ProduceTestingLogger(t),
 			}
 			got, got1 := m.BrowseQuery(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
-			assert.Equalf(t, tt.want, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
-			assert.Equalf(t, tt.want1, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantResponseCode, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantQueryResults, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			if m.connection != nil && m.connection.IsConnected() {
+				t.Log("Closing connection")
+				<-m.connection.Close()
+			}
+		})
+	}
+}
+
+func TestBrowser_browseUnitInfo(t *testing.T) {
+	type fields struct {
+		DefaultBrowser  _default.DefaultBrowser
+		connection      plc4go.PlcConnection
+		sequenceCounter uint8
+		log             zerolog.Logger
+	}
+	type args struct {
+		ctx         context.Context
+		interceptor func(result apiModel.PlcBrowseItem) bool
+		queryName   string
+		query       *unitInfoQuery
+	}
+	tests := []struct {
+		name             string
+		fields           fields
+		args             args
+		setup            func(t *testing.T, fields *fields)
+		wantResponseCode apiModel.PlcResponseCode
+		wantQueryResults []apiModel.PlcBrowseItem
+	}{
+		{
+			name: "non responding browse",
+			args: args{
+				ctx: testutils.TestContext(t),
+				interceptor: func(result apiModel.PlcBrowseItem) bool {
+					// No-OP
+					return true
+				},
+				queryName: "testQuery",
+				query:     NewUnitInfoQuery(readWriteModel.NewUnitAddress(2), nil, 1).(*unitInfoQuery),
+			},
+			setup: func(t *testing.T, fields *fields) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+
+				transport := test.NewTransport(_options...)
+				transportUrl := url.URL{Scheme: "test"}
+				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, transportInstance.Close())
+				})
+				type MockState uint8
+				const (
+					RESET MockState = iota
+					APPLICATION_FILTER_1
+					APPLICATION_FILTER_2
+					INTERFACE_OPTIONS_3
+					INTERFACE_OPTIONS_1_PUN
+					INTERFACE_OPTIONS_1
+					MANUFACTURER
+					DONE
+				)
+				currentState := atomic.Value{}
+				currentState.Store(RESET)
+				stateChangeMutex := sync.Mutex{}
+				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					stateChangeMutex.Lock()
+					defer stateChangeMutex.Unlock()
+					switch currentState.Load().(MockState) {
+					case RESET:
+						t.Log("Dispatching reset echo")
+						transportInstance.FillReadBuffer([]byte("~~~\r"))
+						currentState.Store(APPLICATION_FILTER_1)
+					case APPLICATION_FILTER_1:
+						t.Log("Dispatching app1 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+						transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+						currentState.Store(APPLICATION_FILTER_2)
+					case APPLICATION_FILTER_2:
+						t.Log("Dispatching app2 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+						transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_3)
+					case INTERFACE_OPTIONS_3:
+						t.Log("Dispatching interface 3 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+						transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_1_PUN)
+					case INTERFACE_OPTIONS_1_PUN:
+						t.Log("Dispatching interface 1 PUN echo and confirm???")
+						transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+						transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_1)
+					case INTERFACE_OPTIONS_1:
+						t.Log("Dispatching interface 1 echo and confirm???")
+						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
+						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
+						currentState.Store(MANUFACTURER)
+					case MANUFACTURER:
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
+						currentState.Store(DONE)
+					case DONE:
+						t.Log("Connection dance done")
+						go func() {
+							time.Sleep(200 * time.Millisecond)
+							t.Log("Dispatching 3 MMI segments")
+							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+						}()
+					}
+				})
+				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
+				require.NoError(t, err)
+				driver := NewDriver(_options...)
+				connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+				if err := connectionConnectResult.GetErr(); err != nil {
+					t.Error(err)
+					t.FailNow()
+				}
+				fields.connection = connectionConnectResult.GetConnection()
+				t.Cleanup(func() {
+					timer := time.NewTimer(10 * time.Second)
+					t.Cleanup(func() {
+						utils.CleanupTimer(timer)
+					})
+					select {
+					case <-fields.connection.Close():
+					case <-timer.C:
+						t.Error("timeout")
+					}
+				})
+			},
+			wantResponseCode: apiModel.PlcResponseCode_OK,
+			wantQueryResults: []apiModel.PlcBrowseItem{
+				&spiModel.DefaultPlcBrowseItem{
+					Tag:      NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
+					Name:     "testQuery",
+					Readable: true,
+					Options: map[string]apiValues.PlcValue{
+						"CurrentValue": spiValues.NewPlcSTRING("PC_CNIED"),
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			m := &Browser{
+				DefaultBrowser:  tt.fields.DefaultBrowser,
+				connection:      tt.fields.connection,
+				sequenceCounter: tt.fields.sequenceCounter,
+				log:             tt.fields.log,
+			}
+			gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantQueryResults, gotQueryResults, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			if m.connection != nil && m.connection.IsConnected() {
+				t.Log("Closing connection")
+				<-m.connection.Close()
+			}
 		})
 	}
 }
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 69ac5d4818..154a2b299a 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -270,7 +270,7 @@ func (c *Connection) startSubscriptionHandler() {
 				}
 			}
 		}
-		salLogger.Info().Msg("Ending SAL handler")
+		salLogger.Info().Msg("handler ended")
 	}()
 	c.log.Debug().Msg("Starting MMI handler")
 	c.handlerWaitGroup.Add(1)
@@ -298,7 +298,7 @@ func (c *Connection) startSubscriptionHandler() {
 				}
 			}
 		}
-		mmiLogger.Info().Msg("Ending MMI handler")
+		mmiLogger.Info().Msg("handler ended")
 	}()
 }
 
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index e4c287bd36..7a51b25a2b 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -219,7 +219,6 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 			}
 			assert.True(t, tt.wantAsserter(t, c.ConnectWithContext(tt.args.ctx)), "ConnectWithContext(%v)", tt.args.ctx)
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})
@@ -1755,7 +1754,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			}
 			c.setupConnection(tt.args.ctx, tt.args.ch)
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})
@@ -1857,7 +1855,6 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 			}
 			c.startSubscriptionHandler()
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})