You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2023/06/16 10:58:25 UTC
[plc4x] 01/02: refactor(plc4go/cbus): split up browser code
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 9036acbbd874341d43665a75aa4c7fec698cb936
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Fri Jun 16 12:52:25 2023 +0200
refactor(plc4go/cbus): split up browser code
---
plc4go/internal/cbus/Browser.go | 159 ++++++++++++++-------------
plc4go/internal/cbus/Browser_test.go | 189 ++++++++++++++++++++++++++++++--
plc4go/internal/cbus/Connection.go | 4 +-
plc4go/internal/cbus/Connection_test.go | 3 -
4 files changed, 260 insertions(+), 95 deletions(-)
diff --git a/plc4go/internal/cbus/Browser.go b/plc4go/internal/cbus/Browser.go
index 21a26b62df..2c6a20d25b 100644
--- a/plc4go/internal/cbus/Browser.go
+++ b/plc4go/internal/cbus/Browser.go
@@ -23,6 +23,7 @@ import (
"context"
"fmt"
"runtime/debug"
+ "sync"
"time"
plc4go "github.com/apache/plc4x/plc4go/pkg/api"
@@ -46,107 +47,109 @@ type Browser struct {
}
func NewBrowser(connection plc4go.PlcConnection, _options ...options.WithOption) *Browser {
- browser := Browser{
+ browser := &Browser{
connection: connection,
sequenceCounter: 0,
log: options.ExtractCustomLogger(_options...),
}
browser.DefaultBrowser = _default.NewDefaultBrowser(browser, _options...)
- return &browser
+ return browser
}
-func (m Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (apiModel.PlcResponseCode, []apiModel.PlcBrowseItem) {
- var queryResults []apiModel.PlcBrowseItem
+func (m *Browser) BrowseQuery(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query apiModel.PlcQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
switch query := query.(type) {
case *unitInfoQuery:
- m.log.Trace().Msg("extract units")
- units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
- if err != nil {
- m.log.Error().Err(err).Msg("Error extracting units")
- return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
- }
- attributes, allAttributes := m.extractAttributes(query)
+ return m.browseUnitInfo(ctx, interceptor, queryName, query)
+ default:
+ m.log.Warn().Msgf("unsupported query type supplied %T", query)
+ return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+ }
+}
+func (m *Browser) browseUnitInfo(ctx context.Context, interceptor func(result apiModel.PlcBrowseItem) bool, queryName string, query *unitInfoQuery) (responseCode apiModel.PlcResponseCode, queryResults []apiModel.PlcBrowseItem) {
+ m.log.Trace().Msg("extract units")
+ units, allUnits, err := m.extractUnits(ctx, query, m.getInstalledUnitAddressBytes)
+ if err != nil {
+ m.log.Error().Err(err).Msg("Error extracting units")
+ return apiModel.PlcResponseCode_INTERNAL_ERROR, nil
+ }
+ attributes, allAttributes := m.extractAttributes(query)
+
+ if allUnits {
+ m.log.Info().Msg("Querying all (available) units")
+ }
+unitLoop:
+ for _, unit := range units {
+ m.log.Trace().Msgf("checking unit:\n%s", unit)
+ if err := ctx.Err(); err != nil {
+ m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
+ return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
+ }
+ unitAddress := unit.GetAddress()
+ if !allUnits && allAttributes {
+ m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
+ }
+ event := m.log.Info()
if allUnits {
- m.log.Info().Msg("Querying all (available) units")
+ event = m.log.Debug()
}
- unitLoop:
- for _, unit := range units {
- m.log.Trace().Msgf("checking unit:\n%s", unit)
+ event.Msgf("Query unit %d", unitAddress)
+ for _, attribute := range attributes {
if err := ctx.Err(); err != nil {
m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
- unitAddress := unit.GetAddress()
- if !allUnits && allAttributes {
- m.log.Info().Msgf("Querying all attributes of unit %d", unitAddress)
- }
- event := m.log.Info()
- if allUnits {
- event = m.log.Debug()
+ if !allUnits && !allAttributes {
+ m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
+ } else {
+ event.Msgf("unit %d: Query %s", unitAddress, attribute)
}
- event.Msgf("Query unit %d", unitAddress)
- for _, attribute := range attributes {
- if err := ctx.Err(); err != nil {
- m.log.Info().Err(err).Msgf("Aborting scan at unit %s", unit)
- return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
- }
- if !allUnits && !allAttributes {
- m.log.Info().Msgf("Querying attribute %s of unit %d", attribute, unitAddress)
- } else {
- event.Msgf("unit %d: Query %s", unitAddress, attribute)
- }
- m.log.Trace().Msg("Building request")
- readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
- readRequest, _ := m.connection.ReadRequestBuilder().
- AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
- Build()
- timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
- m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
- requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
- m.log.Trace().Msg("got a response")
- timeoutCancel()
- if err := requestResult.GetErr(); err != nil {
- if allUnits || allAttributes {
- event = m.log.Trace()
- }
- event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
- continue unitLoop
- }
- response := requestResult.GetResponse()
- if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
- event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
- continue unitLoop
+ m.log.Trace().Msg("Building request")
+ readTagName := fmt.Sprintf("%s/%d/%s", queryName, unitAddress, attribute)
+ readRequest, _ := m.connection.ReadRequestBuilder().
+ AddTag(readTagName, NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1)).
+ Build()
+ timeoutCtx, timeoutCancel := context.WithTimeout(ctx, time.Second*2)
+ m.log.Trace().Msgf("Executing readRequest\n%s\nwith timeout %s", readRequest, timeoutCtx)
+ requestResult := <-readRequest.ExecuteWithContext(timeoutCtx)
+ m.log.Trace().Msg("got a response")
+ timeoutCancel()
+ if err := requestResult.GetErr(); err != nil {
+ if allUnits || allAttributes {
+ event = m.log.Trace()
}
- queryResult := spiModel.NewDefaultPlcBrowseItem(
- NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
- queryName,
- "",
- true,
- false,
- false,
- nil,
- map[string]values.PlcValue{
- "CurrentValue": response.GetValue(readTagName),
- },
- )
- if interceptor != nil {
- m.log.Trace().Msg("forwarding query result to interceptor")
- interceptor(queryResult)
- }
- queryResults = append(queryResults, queryResult)
+ event.Err(err).Msgf("unit %d: Can't read attribute %s", unitAddress, attribute)
+ continue unitLoop
+ }
+ response := requestResult.GetResponse()
+ if code := response.GetResponseCode(readTagName); code != apiModel.PlcResponseCode_OK {
+ event.Msgf("unit %d: error reading tag %s. Code %s", unitAddress, attribute, code)
+ continue unitLoop
+ }
+ queryResult := spiModel.NewDefaultPlcBrowseItem(
+ NewCALIdentifyTag(unit, nil /*TODO: add bridge support*/, attribute, 1),
+ queryName,
+ "",
+ true,
+ false,
+ false,
+ nil,
+ map[string]values.PlcValue{
+ "CurrentValue": response.GetValue(readTagName),
+ },
+ )
+ if interceptor != nil {
+ m.log.Trace().Msg("forwarding query result to interceptor")
+ interceptor(queryResult)
}
+ queryResults = append(queryResults, queryResult)
}
- default:
- m.log.Warn().Msgf("unsupported query type supplied %T", query)
- return apiModel.PlcResponseCode_INVALID_ADDRESS, nil
}
- m.log.Trace().Msgf("Browse done with \n%s", queryResults)
return apiModel.PlcResponseCode_OK, queryResults
}
-func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
+func (m *Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInstalledUnitAddressBytes func(ctx context.Context) (map[byte]any, error)) ([]readWriteModel.UnitAddress, bool, error) {
if unitAddress := query.unitAddress; unitAddress != nil {
return []readWriteModel.UnitAddress{unitAddress}, false, nil
} else {
@@ -167,7 +170,7 @@ func (m Browser) extractUnits(ctx context.Context, query *unitInfoQuery, getInst
}
}
-func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
+func (m *Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attribute, bool) {
if attribute := query.attribute; attribute != nil {
return []readWriteModel.Attribute{*attribute}, false
} else {
@@ -179,7 +182,7 @@ func (m Browser) extractAttributes(query *unitInfoQuery) ([]readWriteModel.Attri
}
}
-func (m Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
+func (m *Browser) getInstalledUnitAddressBytes(ctx context.Context) (map[byte]any, error) {
start := time.Now()
defer func() {
m.log.Debug().Msgf("Ending unit address acquiring after %s", time.Since(start))
diff --git a/plc4go/internal/cbus/Browser_test.go b/plc4go/internal/cbus/Browser_test.go
index 7794f16574..5cf353d820 100644
--- a/plc4go/internal/cbus/Browser_test.go
+++ b/plc4go/internal/cbus/Browser_test.go
@@ -22,6 +22,7 @@ package cbus
import (
"context"
"fmt"
+ "github.com/rs/zerolog"
"net/url"
"sync"
"sync/atomic"
@@ -62,16 +63,16 @@ func TestBrowser_BrowseQuery(t *testing.T) {
query apiModel.PlcQuery
}
tests := []struct {
- name string
- fields fields
- args args
- setup func(t *testing.T, fields *fields)
- want apiModel.PlcResponseCode
- want1 []apiModel.PlcBrowseItem
+ name string
+ fields fields
+ args args
+ setup func(t *testing.T, fields *fields)
+ wantResponseCode apiModel.PlcResponseCode
+ wantQueryResults []apiModel.PlcBrowseItem
}{
{
- name: "invalid address",
- want: apiModel.PlcResponseCode_INVALID_ADDRESS,
+ name: "invalid address",
+ wantResponseCode: apiModel.PlcResponseCode_INVALID_ADDRESS,
},
{
name: "non responding browse",
@@ -177,8 +178,8 @@ func TestBrowser_BrowseQuery(t *testing.T) {
}
})
},
- want: apiModel.PlcResponseCode_OK,
- want1: []apiModel.PlcBrowseItem{
+ wantResponseCode: apiModel.PlcResponseCode_OK,
+ wantQueryResults: []apiModel.PlcBrowseItem{
&spiModel.DefaultPlcBrowseItem{
Tag: NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
Name: "testQuery",
@@ -202,8 +203,172 @@ func TestBrowser_BrowseQuery(t *testing.T) {
log: testutils.ProduceTestingLogger(t),
}
got, got1 := m.BrowseQuery(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
- assert.Equalf(t, tt.want, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
- assert.Equalf(t, tt.want1, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+ assert.Equalf(t, tt.wantResponseCode, got, "BrowseQuery(%v, func(), %v,\n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+ assert.Equalf(t, tt.wantQueryResults, got1, "BrowseQuery(%v, func(), %v, \n%v\n)", tt.args.ctx, tt.args.queryName, tt.args.query)
+ if m.connection != nil && m.connection.IsConnected() {
+ t.Log("Closing connection")
+ <-m.connection.Close()
+ }
+ })
+ }
+}
+
+func TestBrowser_browseUnitInfo(t *testing.T) {
+ type fields struct {
+ DefaultBrowser _default.DefaultBrowser
+ connection plc4go.PlcConnection
+ sequenceCounter uint8
+ log zerolog.Logger
+ }
+ type args struct {
+ ctx context.Context
+ interceptor func(result apiModel.PlcBrowseItem) bool
+ queryName string
+ query *unitInfoQuery
+ }
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ setup func(t *testing.T, fields *fields)
+ wantResponseCode apiModel.PlcResponseCode
+ wantQueryResults []apiModel.PlcBrowseItem
+ }{
+ {
+ name: "non responding browse",
+ args: args{
+ ctx: testutils.TestContext(t),
+ interceptor: func(result apiModel.PlcBrowseItem) bool {
+ // No-OP
+ return true
+ },
+ queryName: "testQuery",
+ query: NewUnitInfoQuery(readWriteModel.NewUnitAddress(2), nil, 1).(*unitInfoQuery),
+ },
+ setup: func(t *testing.T, fields *fields) {
+ _options := testutils.EnrichOptionsWithOptionsForTesting(t)
+
+ transport := test.NewTransport(_options...)
+ transportUrl := url.URL{Scheme: "test"}
+ transportInstance, err := transport.CreateTransportInstance(transportUrl, nil, _options...)
+ require.NoError(t, err)
+ t.Cleanup(func() {
+ assert.NoError(t, transportInstance.Close())
+ })
+ type MockState uint8
+ const (
+ RESET MockState = iota
+ APPLICATION_FILTER_1
+ APPLICATION_FILTER_2
+ INTERFACE_OPTIONS_3
+ INTERFACE_OPTIONS_1_PUN
+ INTERFACE_OPTIONS_1
+ MANUFACTURER
+ DONE
+ )
+ currentState := atomic.Value{}
+ currentState.Store(RESET)
+ stateChangeMutex := sync.Mutex{}
+ transportInstance.(*test.TransportInstance).SetWriteInterceptor(func(transportInstance *test.TransportInstance, data []byte) {
+ stateChangeMutex.Lock()
+ defer stateChangeMutex.Unlock()
+ switch currentState.Load().(MockState) {
+ case RESET:
+ t.Log("Dispatching reset echo")
+ transportInstance.FillReadBuffer([]byte("~~~\r"))
+ currentState.Store(APPLICATION_FILTER_1)
+ case APPLICATION_FILTER_1:
+ t.Log("Dispatching app1 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32100FF\r"))
+ transportInstance.FillReadBuffer([]byte("322100AD\r\n"))
+ currentState.Store(APPLICATION_FILTER_2)
+ case APPLICATION_FILTER_2:
+ t.Log("Dispatching app2 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A32200FF\r"))
+ transportInstance.FillReadBuffer([]byte("322200AC\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_3)
+ case INTERFACE_OPTIONS_3:
+ t.Log("Dispatching interface 3 echo and confirm")
+ transportInstance.FillReadBuffer([]byte("@A342000A\r"))
+ transportInstance.FillReadBuffer([]byte("3242008C\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1_PUN)
+ case INTERFACE_OPTIONS_1_PUN:
+ t.Log("Dispatching interface 1 PUN echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3410079\r"))
+ transportInstance.FillReadBuffer([]byte("3241008D\r\n"))
+ currentState.Store(INTERFACE_OPTIONS_1)
+ case INTERFACE_OPTIONS_1:
+ t.Log("Dispatching interface 1 echo and confirm???")
+ transportInstance.FillReadBuffer([]byte("@A3300079\r"))
+ transportInstance.FillReadBuffer([]byte("3230009E\r\n"))
+ currentState.Store(MANUFACTURER)
+ case MANUFACTURER:
+ t.Log("Dispatching manufacturer")
+ transportInstance.FillReadBuffer([]byte("g.890050435F434E49454422\r\n"))
+ currentState.Store(DONE)
+ case DONE:
+ t.Log("Connection dance done")
+ go func() {
+ time.Sleep(200 * time.Millisecond)
+ t.Log("Dispatching 3 MMI segments")
+ transportInstance.FillReadBuffer([]byte("86020200F900FF0094120006000000000000000008000000000000000000CA\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F900FF580000000000000000000000000000000000000000000026\r\n"))
+ transportInstance.FillReadBuffer([]byte("86020200F700FFB00000000000000000000000000000000000000000D0\r\n"))
+ }()
+ }
+ })
+ err = transport.AddPreregisteredInstances(transportUrl, transportInstance)
+ require.NoError(t, err)
+ driver := NewDriver(_options...)
+ connectionConnectResult := <-driver.GetConnection(transportUrl, map[string]transports.Transport{"test": transport}, map[string][]string{})
+ if err := connectionConnectResult.GetErr(); err != nil {
+ t.Error(err)
+ t.FailNow()
+ }
+ fields.connection = connectionConnectResult.GetConnection()
+ t.Cleanup(func() {
+ timer := time.NewTimer(10 * time.Second)
+ t.Cleanup(func() {
+ utils.CleanupTimer(timer)
+ })
+ select {
+ case <-fields.connection.Close():
+ case <-timer.C:
+ t.Error("timeout")
+ }
+ })
+ },
+ wantResponseCode: apiModel.PlcResponseCode_OK,
+ wantQueryResults: []apiModel.PlcBrowseItem{
+ &spiModel.DefaultPlcBrowseItem{
+ Tag: NewCALIdentifyTag(readWriteModel.NewUnitAddress(2), nil, readWriteModel.Attribute_Manufacturer, 1),
+ Name: "testQuery",
+ Readable: true,
+ Options: map[string]apiValues.PlcValue{
+ "CurrentValue": spiValues.NewPlcSTRING("PC_CNIED"),
+ },
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ if tt.setup != nil {
+ tt.setup(t, &tt.fields)
+ }
+ m := &Browser{
+ DefaultBrowser: tt.fields.DefaultBrowser,
+ connection: tt.fields.connection,
+ sequenceCounter: tt.fields.sequenceCounter,
+ log: tt.fields.log,
+ }
+ gotResponseCode, gotQueryResults := m.browseUnitInfo(tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+ assert.Equalf(t, tt.wantResponseCode, gotResponseCode, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+ assert.Equalf(t, tt.wantQueryResults, gotQueryResults, "browseUnitInfo(%v, %v, %v, %v)", tt.args.ctx, tt.args.interceptor, tt.args.queryName, tt.args.query)
+ if m.connection != nil && m.connection.IsConnected() {
+ t.Log("Closing connection")
+ <-m.connection.Close()
+ }
})
}
}
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index 69ac5d4818..154a2b299a 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -270,7 +270,7 @@ func (c *Connection) startSubscriptionHandler() {
}
}
}
- salLogger.Info().Msg("Ending SAL handler")
+ salLogger.Info().Msg("handler ended")
}()
c.log.Debug().Msg("Starting MMI handler")
c.handlerWaitGroup.Add(1)
@@ -298,7 +298,7 @@ func (c *Connection) startSubscriptionHandler() {
}
}
}
- mmiLogger.Info().Msg("Ending MMI handler")
+ mmiLogger.Info().Msg("handler ended")
}()
}
diff --git a/plc4go/internal/cbus/Connection_test.go b/plc4go/internal/cbus/Connection_test.go
index e4c287bd36..7a51b25a2b 100644
--- a/plc4go/internal/cbus/Connection_test.go
+++ b/plc4go/internal/cbus/Connection_test.go
@@ -219,7 +219,6 @@ func TestConnection_ConnectWithContext(t *testing.T) {
}
assert.True(t, tt.wantAsserter(t, c.ConnectWithContext(tt.args.ctx)), "ConnectWithContext(%v)", tt.args.ctx)
// To shut down properly we always do that
- time.Sleep(20 * time.Millisecond)
c.SetConnected(false)
c.handlerWaitGroup.Wait()
})
@@ -1755,7 +1754,6 @@ func TestConnection_setupConnection(t *testing.T) {
}
c.setupConnection(tt.args.ctx, tt.args.ch)
// To shut down properly we always do that
- time.Sleep(20 * time.Millisecond)
c.SetConnected(false)
c.handlerWaitGroup.Wait()
})
@@ -1857,7 +1855,6 @@ func TestConnection_startSubscriptionHandler(t *testing.T) {
}
c.startSubscriptionHandler()
// To shut down properly we always do that
- time.Sleep(20 * time.Millisecond)
c.SetConnected(false)
c.handlerWaitGroup.Wait()
})