You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 10:58:25 UTC

[plc4x] 01/02: refactor(plc4go/cbus): split up browser code

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 9036acbbd874341d43665a75aa4c7fec698cb936
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:52:25 2023 +0200

    refactor(plc4go/cbus): split up browser code
---
 plc4go/internal/cbus/Browser.go         | 159 ++++++++++++++-------------
 plc4go/internal/cbus/Browser_test.go    | 189 ++++++++++++++++++++++++++++++--
 plc4go/internal/cbus/Connection.go      |   4 +-
 plc4go/internal/cbus/Connection_test.go |   3 -
 4 files changed, 260 insertions(+), 95 deletions(-)

diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 21a26b62df..2c6a20d25b 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -23,6 +23,7 @@ import (
 	"context"
 	"fmt"
 	"runtime/debug"
+	"sync"
 	"time"
 
 	plc4go "github.com/apache/plc4x/plc4go/pkg/api"
@@ -46,107 +47,109 @@ type Browser struct {
 }
 
 func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption) *Browser {
-	browser := Browser{
+	browser := &Browser{
 		connection:      connection,
 		sequenceCounter: 0,
 
 		log: options.ExtractCustomLogger(_options...),
 	}
 	browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
-	return &browser
+	return browser
 }
 
-func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (apiModel.PlcResponseCode, []apiModel.PlcBrowseItem) {
-	var queryResults []apiModel.PlcBrowseItem
+func (m *Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
 	switch query := query.(type) {
 	case *unitInfoQuery:
-		m.log.Trace().Msg("extract units")
-		units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
-		if err != nil {
-			m.log.Error().Err(err).Msg("Error extracting units")
-			return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
-		}
-		attributes, allAttributes := m.extractAttributes(query)
+		return m.browseUnitInfo(ctx, interceptor, queryName, query)
+	default:
+		m.log.Warn().Msgf("unsupported query type supplied %T", query)
+		return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+	}
+}
 
+func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query *unitInfoQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
+	m.log.Trace().Msg("extract units")
+	units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
+	if err != nil {
+		m.log.Error().Err(err).Msg("Error extracting units")
+		return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+	}
+	attributes, allAttributes := m.extractAttributes(query)
+
+	if allUnits {
+		m.log.Info().Msg("Querying all (available) units")
+	}
+unitLoop:
+	for _, unit := range units {
+		m.log.Trace().Msgf("checking unit:\n%s", unit)
+		if err := ctx.Err(); err != nil {
+			m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
+			return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+		}
+		unitAddress := unit.GetAddress()
+		if !allUnits && allAttributes {
+			m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
+		}
+		event := m.log.Info()
 		if allUnits {
-			m.log.Info().Msg("Querying all (available) units")
+			event = m.log.Debug()
 		}
-	unitLoop:
-		for _, unit := range units {
-			m.log.Trace().Msgf("checking unit:\n%s", unit)
+		event.Msgf("Query unit  %d", unitAddress)
+		for _, attribute := range attributes {
 			if err := ctx.Err(); err != nil {
 				m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
 				return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
 			}
-			unitAddress := unit.GetAddress()
-			if !allUnits && allAttributes {
-				m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
-			}
-			event := m.log.Info()
-			if allUnits {
-				event = m.log.Debug()
+			if !allUnits && !allAttributes {
+				m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
+			} else {
+				event.Msgf("unit %d: Query %s", unitAddress, attribute)
 			}
-			event.Msgf("Query unit  %d", unitAddress)
-			for _, attribute := range attributes {
-				if err := ctx.Err(); err != nil {
-					m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
-					return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
-				}
-				if !allUnits && !allAttributes {
-					m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
-				} else {
-					event.Msgf("unit %d: Query %s", unitAddress, attribute)
-				}
-				m.log.Trace().Msg("Building request")
-				readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
-				readRequest, _ := m.connection.ReadRequestBuilder().
-					AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
-					Build()
-				timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
-				m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
-				requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
-				m.log.Trace().Msg("got a response")
-				timeoutCancel()
-				if err := requestResult.GetErr(); err != nil {
-					if allUnits || allAttributes {
-						event = m.log.Trace()
-					}
-					event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
-					continue unitLoop
-				}
-				response := requestResult.GetResponse()
-				if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
-					event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
-					continue unitLoop
+			m.log.Trace().Msg("Building request")
+			readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
+			readRequest, _ := m.connection.ReadRequestBuilder().
+				AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
+				Build()
+			timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+			m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
+			requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
+			m.log.Trace().Msg("got a response")
+			timeoutCancel()
+			if err := requestResult.GetErr(); err != nil {
+				if allUnits || allAttributes {
+					event = m.log.Trace()
 				}
-				queryResult := spiModel.NewDefaultPlcBrowseItem(
-					NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
-					queryName,
-					"",
-					true,
-					false,
-					false,
-					nil,
-					map[string]values.PlcValue{
-						"CurrentValue": response.GetValue(readTagName),
-					},
-				)
-				if interceptor != nil {
-					m.log.Trace().Msg("forwarding query result to interceptor")
-					interceptor(queryResult)
-				}
-				queryResults = append(queryResults, queryResult)
+				event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
+				continue unitLoop
+			}
+			response := requestResult.GetResponse()
+			if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
+				event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
+				continue unitLoop
+			}
+			queryResult := spiModel.NewDefaultPlcBrowseItem(
+				NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
+				queryName,
+				"",
+				true,
+				false,
+				false,
+				nil,
+				map[string]values.PlcValue{
+					"CurrentValue": response.GetValue(readTagName),
+				},
+			)
+			if interceptor != nil {
+				m.log.Trace().Msg("forwarding query result to interceptor")
+				interceptor(queryResult)
 			}
+			queryResults = append(queryResults, queryResult)
 		}
-	default:
-		m.log.Warn().Msgf("unsupported query type supplied %T", query)
-		return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
 	}
-	m.log.Trace().Msgf("Browse done with \n%s", queryResults)
 	return apiModel.PlcResponseCode_OK, queryResults
 }
 
-func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
+func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
 	if unitAddress := query.unitAddress; unitAddress != nil {
 		return []readWriteModel.UnitAddress{unitAddress}, false, nil
 	} else {
@@ -167,7 +170,7 @@ func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInst
 	}
 }
 
-func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
+func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
 	if attribute := query.attribute; attribute != nil {
 		return []readWriteModel.Attribute{*attribute}, false
 	} else {
@@ -179,7 +182,7 @@ func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attri
 	}
 }
 
-func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
+func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
 	start := time.Now()
 	defer func() {
 		m.log.Debug().Msgf("Ending unit address acquiring after %s", time.Since(start))
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 7794f16574..5cf353d820 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,6 +22,7 @@ package cbus
 import (
 	"context"
 	"fmt"
+	"github.com/rs/zerolog"
 	"net/url"
 	"sync"
 	"sync/atomic"
@@ -62,16 +63,16 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 		query       apiModel.PlcQuery
 	}
 	tests := []struct {
-		name   string
-		fields fields
-		args   args
-		setup  func(t *testing.T, fields *fields)
-		want   apiModel.PlcResponseCode
-		want1  []apiModel.PlcBrowseItem
+		name             string
+		fields           fields
+		args             args
+		setup            func(t *testing.T, fields *fields)
+		wantResponseCode apiModel.PlcResponseCode
+		wantQueryResults []apiModel.PlcBrowseItem
 	}{
 		{
-			name: "invalid address",
-			want: apiModel.PlcResponseCode_INVALID_ADDRESS,
+			name:             "invalid address",
+			wantResponseCode: apiModel.PlcResponseCode_INVALID_ADDRESS,
 		},
 		{
 			name: "non responding browse",
@@ -177,8 +178,8 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 					}
 				})
 			},
-			want: apiModel.PlcResponseCode_OK,
-			want1: []apiModel.PlcBrowseItem{
+			wantResponseCode: apiModel.PlcResponseCode_OK,
+			wantQueryResults: []apiModel.PlcBrowseItem{
 				&spiModel.DefaultPlcBrowseItem{
 					Tag:      NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
 					Name:     "testQuery",
@@ -202,8 +203,172 @@ func TestBrowser_BrowseQuery(t *testing.T) {
 				log:             testutils.ProduceTestingLogger(t),
 			}
 			got, got1 := m.BrowseQuery(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
-			assert.Equalf(t, tt.want, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
-			assert.Equalf(t, tt.want1, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantResponseCode, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantQueryResults, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+			if m.connection != nil && m.connection.IsConnected() {
+				t.Log("Closing connection")
+				<-m.connection.Close()
+			}
+		})
+	}
+}
+
+func TestBrowser_browseUnitInfo(t *testing.T) {
+	type fields struct {
+		DefaultBrowser  _default.DefaultBrowser
+		connection      plc4go.PlcConnection
+		sequenceCounter uint8
+		log             zerolog.Logger
+	}
+	type args struct {
+		ctx         context.Context
+		interceptor func(result apiModel.PlcBrowseItem) bool
+		queryName   string
+		query       *unitInfoQuery
+	}
+	tests := []struct {
+		name             string
+		fields           fields
+		args             args
+		setup            func(t *testing.T, fields *fields)
+		wantResponseCode apiModel.PlcResponseCode
+		wantQueryResults []apiModel.PlcBrowseItem
+	}{
+		{
+			name: "non responding browse",
+			args: args{
+				ctx: testutils.TestContext(t),
+				interceptor: func(result apiModel.PlcBrowseItem) bool {
+					// No-OP
+					return true
+				},
+				queryName: "testQuery",
+				query:     NewUnitInfoQuery(readWriteModel.NewUnitAddress(2), nil, 1).(*unitInfoQuery),
+			},
+			setup: func(t *testing.T, fields *fields) {
+				_options := testutils.EnrichOptionsWithOptionsForTesting(t)
+
+				transport := test.NewTransport(_options...)
+				transportUrl := url.URL{Scheme: "test"}
+				transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
+				require.NoError(t, err)
+				t.Cleanup(func() {
+					assert.NoError(t, transportInstance.Close())
+				})
+				type MockState uint8
+				const (
+					RESET MockState = iota
+					APPLICATION_FILTER_1
+					APPLICATION_FILTER_2
+					INTERFACE_OPTIONS_3
+					INTERFACE_OPTIONS_1_PUN
+					INTERFACE_OPTIONS_1
+					MANUFACTURER
+					DONE
+				)
+				currentState := atomic.Value{}
+				currentState.Store(RESET)
+				stateChangeMutex := sync.Mutex{}
+				transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+					stateChangeMutex.Lock()
+					defer stateChangeMutex.Unlock()
+					switch currentState.Load().(MockState) {
+					case RESET:
+						t.Log("Dispatching reset echo")
+						transportInstance.FillReadBuffer([]byte("~~~\r"))
+						currentState.Store(APPLICATION_FILTER_1)
+					case APPLICATION_FILTER_1:
+						t.Log("Dispatching app1 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+						transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+						currentState.Store(APPLICATION_FILTER_2)
+					case APPLICATION_FILTER_2:
+						t.Log("Dispatching app2 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+						transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_3)
+					case INTERFACE_OPTIONS_3:
+						t.Log("Dispatching interface 3 echo and confirm")
+						transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+						transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_1_PUN)
+					case INTERFACE_OPTIONS_1_PUN:
+						t.Log("Dispatching interface 1 PUN echo and confirm???")
+						transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+						transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+						currentState.Store(INTERFACE_OPTIONS_1)
+					case INTERFACE_OPTIONS_1:
+						t.Log("Dispatching interface 1 echo and confirm???")
+						transportInstance.FillReadBuffer([]byte("@A3300079\r"))
+						transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
+						currentState.Store(MANUFACTURER)
+					case MANUFACTURER:
+						t.Log("Dispatching manufacturer")
+						transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
+						currentState.Store(DONE)
+					case DONE:
+						t.Log("Connection dance done")
+						go func() {
+							time.Sleep(200 * time.Millisecond)
+							t.Log("Dispatching 3 MMI segments")
+							transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+							transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+							transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+						}()
+					}
+				})
+				err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
+				require.NoError(t, err)
+				driver := NewDriver(_options...)
+				connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+				if err := connectionConnectResult.GetErr(); err != nil {
+					t.Error(err)
+					t.FailNow()
+				}
+				fields.connection = connectionConnectResult.GetConnection()
+				t.Cleanup(func() {
+					timer := time.NewTimer(10 * time.Second)
+					t.Cleanup(func() {
+						utils.CleanupTimer(timer)
+					})
+					select {
+					case <-fields.connection.Close():
+					case <-timer.C:
+						t.Error("timeout")
+					}
+				})
+			},
+			wantResponseCode: apiModel.PlcResponseCode_OK,
+			wantQueryResults: []apiModel.PlcBrowseItem{
+				&spiModel.DefaultPlcBrowseItem{
+					Tag:      NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
+					Name:     "testQuery",
+					Readable: true,
+					Options: map[string]apiValues.PlcValue{
+						"CurrentValue": spiValues.NewPlcSTRING("PC_CNIED"),
+					},
+				},
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if tt.setup != nil {
+				tt.setup(t, &tt.fields)
+			}
+			m := &Browser{
+				DefaultBrowser:  tt.fields.DefaultBrowser,
+				connection:      tt.fields.connection,
+				sequenceCounter: tt.fields.sequenceCounter,
+				log:             tt.fields.log,
+			}
+			gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			assert.Equalf(t, tt.wantQueryResults, gotQueryResults, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+			if m.connection != nil && m.connection.IsConnected() {
+				t.Log("Closing connection")
+				<-m.connection.Close()
+			}
 		})
 	}
 }
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 69ac5d4818..154a2b299a 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -270,7 +270,7 @@ func (c *Connection) startSubscriptionHandler() {
 				}
 			}
 		}
-		salLogger.Info().Msg("Ending SAL handler")
+		salLogger.Info().Msg("handler ended")
 	}()
 	c.log.Debug().Msg("Starting MMI handler")
 	c.handlerWaitGroup.Add(1)
@@ -298,7 +298,7 @@ func (c *Connection) startSubscriptionHandler() {
 				}
 			}
 		}
-		mmiLogger.Info().Msg("Ending MMI handler")
+		mmiLogger.Info().Msg("handler ended")
 	}()
 }
 
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index e4c287bd36..7a51b25a2b 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -219,7 +219,6 @@ func TestConnection_ConnectWithContext(t *testing.T) {
 			}
 			assert.True(t, tt.wantAsserter(t, c.ConnectWithContext(tt.args.ctx)), "ConnectWithContext(%v)", tt.args.ctx)
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})
@@ -1755,7 +1754,6 @@ func TestConnection_setupConnection(t *testing.T) {
 			}
 			c.setupConnection(tt.args.ctx, tt.args.ch)
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})
@@ -1857,7 +1855,6 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
 			}
 			c.startSubscriptionHandler()
 			// To shut down properly we always do that
-			time.Sleep(20 * time.Millisecond)
 			c.SetConnected(false)
 			c.handlerWaitGroup.Wait()
 		})