You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/01 15:00:57 UTC
[plc4x] 04/04: fix(plc4go/cbus): implement connection setup
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit f20adc91761cfc6c2acb1f1c678360b2daf37112
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 1 16:53:04 2022 +0200
fix(plc4go/cbus): implement connection setup
---
plc4go/internal/cbus/Connection.go | 142 +++++++++++++++++++++++++++++++------
plc4go/tools/plc4xbrowser/main.go | 1 -
2 files changed, 119 insertions(+), 24 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index fe88e0a87..c57d8143e 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,8 +26,11 @@ import (
internalModel "github.com/apache/plc4x/plc4go/internal/spi/model"
"github.com/apache/plc4x/plc4go/pkg/api"
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+ readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+ "github.com/pkg/errors"
"github.com/rs/zerolog/log"
"sync"
+ "time"
)
type AlphaGenerator struct {
@@ -101,6 +104,31 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
return c.messageCodec
}
+func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+ log.Trace().Msg("Connecting")
+ ch := make(chan plc4go.PlcConnectionConnectResult)
+ go func() {
+ err := c.messageCodec.Connect()
+ if err != nil {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+ }
+
+ // For testing purposes we can skip the waiting for a complete connection
+ if !c.driverContext.awaitSetupComplete {
+ go c.setupConnection(ch)
+ log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+ // Here we write directly and don't wait till the connection is "really" connected
+ // Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+ c.SetConnected(true)
+ return
+ }
+
+ c.setupConnection(ch)
+ }()
+ return ch
+}
+
func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
return _default.DefaultConnectionMetadata{
ProvidesReading: true,
@@ -110,29 +138,6 @@ func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
}
}
-func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
- connectionConnectResult := c.DefaultConnection.Connect()
- ch := make(chan plc4go.PlcConnectionConnectResult)
- go func() {
- connectResult := <-connectionConnectResult
- if connectResult.GetErr() == nil {
- log.Debug().Msg("Starting subscription handler")
- go func() {
- for c.IsConnected() {
- log.Debug().Msg("Handling incoming message")
- for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
- for _, subscriber := range c.subscribers {
- subscriber.handleMonitoredSal(monitoredSal)
- }
- }
- }
- }()
- }
- ch <- connectResult
- }()
- return ch
-}
-
func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
return internalModel.NewDefaultPlcReadRequestBuilder(c.GetPlcFieldHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm))
}
@@ -168,3 +173,94 @@ func (c *Connection) addSubscriber(subscriber *Subscriber) {
func (c *Connection) String() string {
return fmt.Sprintf("cbus.Connection")
}
+
+func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+ cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
+ requestContext := &c.messageCodec.(*MessageCodec).requestContext
+
+ {
+ log.Debug().Msg("Send a reset Request")
+ requestTypeReset := readWriteModel.RequestType_RESET
+ requestTypeResetByte := byte(readWriteModel.RequestType_RESET)
+ requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeResetByte, requestTypeReset, &requestTypeResetByte, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
+ if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+ return
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ {
+ log.Debug().Msg("Set application filter to all")
+ applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(0xFF), 1)
+ calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_APPLICATION_ADDRESS_1, 0x0, applicationAddress1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+ directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+ if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+ return
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ {
+ log.Debug().Msg("Set interface options 3")
+ interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(true, false, true, false), 1)
+ calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_3, 0x0, interfaceOptions3, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+ directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+ if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+ return
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ {
+ log.Debug().Msg("Set interface options 1 power up settings")
+ interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true)), 1)
+ calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, 0x0, interfaceOptions1PowerUpSettings, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+ directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+ if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+ return
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ {
+ log.Debug().Msg("Set interface options 1")
+ interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true), 1)
+ calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1, 0x0, interfaceOptions1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+ directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+ if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+ c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+ return
+ }
+ time.Sleep(time.Millisecond * 100)
+ }
+ c.fireConnected(ch)
+
+ log.Debug().Msg("Starting subscription handler")
+ go func() {
+ for c.IsConnected() {
+ log.Debug().Msg("Handling incoming message")
+ for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+ for _, subscriber := range c.subscribers {
+ subscriber.handleMonitoredSal(monitoredSal)
+ }
+ }
+ }
+ }()
+}
+
+func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
+ } else {
+ log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+ }
+}
+
+func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
+ if c.driverContext.awaitSetupComplete {
+ ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
+ } else {
+ log.Info().Msg("Successfully connected")
+ }
+ c.SetConnected(true)
+}
diff --git a/plc4go/tools/plc4xbrowser/main.go b/plc4go/tools/plc4xbrowser/main.go
index f43b319d7..951f634ad 100644
--- a/plc4go/tools/plc4xbrowser/main.go
+++ b/plc4go/tools/plc4xbrowser/main.go
@@ -250,7 +250,6 @@ func buildCommandArea(newPrimitive func(text string) tview.Primitive, applicatio
entries = append(entries, "subscribe "+connectionsString)
}
}
- log.Info().Msgf("%v %v", entries, config.History.Last10Hosts)
return
})
commandArea.AddItem(commandInputField, 2, 0, 1, 1, 0, 0, true)