You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/01 15:00:57 UTC

[plc4x] 04/04: fix(plc4go/cbus): implement connection setup

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit f20adc91761cfc6c2acb1f1c678360b2daf37112
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Mon Aug 1 16:53:04 2022 +0200

    fix(plc4go/cbus): implement connection setup
---
 plc4go/internal/cbus/Connection.go | 142 +++++++++++++++++++++++++++++++------
 plc4go/tools/plc4xbrowser/main.go  |   1 -
 2 files changed, 119 insertions(+), 24 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index fe88e0a87..c57d8143e 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -26,8 +26,11 @@ import (
 	internalModel "github.com/apache/plc4x/plc4go/internal/spi/model"
 	"github.com/apache/plc4x/plc4go/pkg/api"
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
+	readWriteModel "github.com/apache/plc4x/plc4go/protocols/cbus/readwrite/model"
+	"github.com/pkg/errors"
 	"github.com/rs/zerolog/log"
 	"sync"
+	"time"
 )
 
 type AlphaGenerator struct {
@@ -101,6 +104,31 @@ func (c *Connection) GetMessageCodec() spi.MessageCodec {
 	return c.messageCodec
 }
 
+func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
+	log.Trace().Msg("Connecting")
+	ch := make(chan plc4go.PlcConnectionConnectResult)
+	go func() {
+		err := c.messageCodec.Connect()
+		if err != nil {
+			ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+		}
+
+		// For testing purposes we can skip the waiting for a complete connection
+		if !c.driverContext.awaitSetupComplete {
+			go c.setupConnection(ch)
+			log.Warn().Msg("Connection used in an unsafe way. !!!DON'T USE IN PRODUCTION!!!")
+			// Here we write directly and don't wait till the connection is "really" connected
+			// Note: we can't use fireConnected here as it's guarded against m.driverContext.awaitSetupComplete
+			ch <- _default.NewDefaultPlcConnectionConnectResult(c, err)
+			c.SetConnected(true)
+			return
+		}
+
+		c.setupConnection(ch)
+	}()
+	return ch
+}
+
 func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
 	return _default.DefaultConnectionMetadata{
 		ProvidesReading:     true,
@@ -110,29 +138,6 @@ func (c *Connection) GetMetadata() apiModel.PlcConnectionMetadata {
 	}
 }
 
-func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
-	connectionConnectResult := c.DefaultConnection.Connect()
-	ch := make(chan plc4go.PlcConnectionConnectResult)
-	go func() {
-		connectResult := <-connectionConnectResult
-		if connectResult.GetErr() == nil {
-			log.Debug().Msg("Starting subscription handler")
-			go func() {
-				for c.IsConnected() {
-					log.Debug().Msg("Handling incoming message")
-					for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
-						for _, subscriber := range c.subscribers {
-							subscriber.handleMonitoredSal(monitoredSal)
-						}
-					}
-				}
-			}()
-		}
-		ch <- connectResult
-	}()
-	return ch
-}
-
 func (c *Connection) ReadRequestBuilder() apiModel.PlcReadRequestBuilder {
 	return internalModel.NewDefaultPlcReadRequestBuilder(c.GetPlcFieldHandler(), NewReader(&c.alphaGenerator, c.messageCodec, c.tm))
 }
@@ -168,3 +173,94 @@ func (c *Connection) addSubscriber(subscriber *Subscriber) {
 func (c *Connection) String() string {
 	return fmt.Sprintf("cbus.Connection")
 }
+
+func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult) {
+	cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
+	requestContext := &c.messageCodec.(*MessageCodec).requestContext
+
+	{
+		log.Debug().Msg("Send a reset Request")
+		requestTypeReset := readWriteModel.RequestType_RESET
+		requestTypeResetByte := byte(readWriteModel.RequestType_RESET)
+		requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeResetByte, requestTypeReset, &requestTypeResetByte, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
+		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+			return
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+	{
+		log.Debug().Msg("Set application filter to all")
+		applicationAddress1 := readWriteModel.NewParameterValueApplicationAddress1(readWriteModel.NewApplicationAddress1(0xFF), 1)
+		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_APPLICATION_ADDRESS_1, 0x0, applicationAddress1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+			return
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+	{
+		log.Debug().Msg("Set interface options 3")
+		interfaceOptions3 := readWriteModel.NewParameterValueInterfaceOptions3(readWriteModel.NewInterfaceOptions3(true, false, true, false), 1)
+		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_3, 0x0, interfaceOptions3, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+			return
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+	{
+		log.Debug().Msg("Set interface options 1 power up settings")
+		interfaceOptions1PowerUpSettings := readWriteModel.NewParameterValueInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1PowerUpSettings(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true)), 1)
+		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1_POWER_UP_SETTINGS, 0x0, interfaceOptions1PowerUpSettings, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+			return
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+	{
+		log.Debug().Msg("Set interface options 1")
+		interfaceOptions1 := readWriteModel.NewParameterValueInterfaceOptions1(readWriteModel.NewInterfaceOptions1(true, true, true, true, false, true), 1)
+		calData := readWriteModel.NewCALDataWrite(readWriteModel.Parameter_INTERFACE_OPTIONS_1, 0x0, interfaceOptions1, readWriteModel.CALCommandTypeContainer_CALCommandWrite_3Bytes, nil, *requestContext)
+		directCommand := readWriteModel.NewRequestDirectCommandAccess(calData, 0x40, nil, nil, 0x0, readWriteModel.NewRequestTermination(), *cbusOptions)
+		if err := c.messageCodec.Send(readWriteModel.NewCBusMessageToServer(directCommand, *requestContext, *cbusOptions)); err != nil {
+			c.fireConnectionError(errors.Wrap(err, "Error writing reset"), ch)
+			return
+		}
+		time.Sleep(time.Millisecond * 100)
+	}
+	c.fireConnected(ch)
+
+	log.Debug().Msg("Starting subscription handler")
+	go func() {
+		for c.IsConnected() {
+			log.Debug().Msg("Handling incoming message")
+			for monitoredSal := range c.messageCodec.(*MessageCodec).monitoredSALs {
+				for _, subscriber := range c.subscribers {
+					subscriber.handleMonitoredSal(monitoredSal)
+				}
+			}
+		}
+	}()
+}
+
+func (c *Connection) fireConnectionError(err error, ch chan<- plc4go.PlcConnectionConnectResult) {
+	if c.driverContext.awaitSetupComplete {
+		ch <- _default.NewDefaultPlcConnectionConnectResult(nil, errors.Wrap(err, "Error during connection"))
+	} else {
+		log.Error().Err(err).Msg("awaitSetupComplete set to false and we got a error during connect")
+	}
+}
+
+func (c *Connection) fireConnected(ch chan<- plc4go.PlcConnectionConnectResult) {
+	if c.driverContext.awaitSetupComplete {
+		ch <- _default.NewDefaultPlcConnectionConnectResult(c, nil)
+	} else {
+		log.Info().Msg("Successfully connected")
+	}
+	c.SetConnected(true)
+}
diff --git a/plc4go/tools/plc4xbrowser/main.go b/plc4go/tools/plc4xbrowser/main.go
index f43b319d7..951f634ad 100644
--- a/plc4go/tools/plc4xbrowser/main.go
+++ b/plc4go/tools/plc4xbrowser/main.go
@@ -250,7 +250,6 @@ func buildCommandArea(newPrimitive func(text string) tview.Primitive, applicatio
 					entries = append(entries, "subscribe "+connectionsString)
 				}
 			}
-			log.Info().Msgf("%v %v", entries, config.History.Last10Hosts)
 			return
 		})
 		commandArea.AddItem(commandInputField, 2, 0, 1, 1, 0, 0, true)