You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 09:13:10 UTC

[plc4x] 03/03: fix(plc4go/cbus): fixed issue when connection doesn't work on a power up notification

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit e514fe10e5fb3cda2a32587b90ceaf5221f02436
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 11:13:00 2022 +0200

    fix(plc4go/cbus): fixed issue when connection doesn't work on a power up notification
---
 plc4go/internal/cbus/Connection.go                 | 61 ++++++++++++++++------
 .../apache/plc4x/java/cbus/RandomPackagesTest.java |  1 +
 2 files changed, 46 insertions(+), 16 deletions(-)

diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index de040eede..16315f92b 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -181,10 +181,13 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 	cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
 	requestContext := &c.messageCodec.(*MessageCodec).requestContext
 
-	// TODO: Sometimes we get a power up and we need a second reset
-	if !c.sendReset(ctx, ch, cbusOptions, requestContext) {
-		log.Trace().Msg("Reset failed")
-		return
+	if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) {
+		log.Warn().Msg("First reset failed")
+		// We try a second reset in case we get a power up
+		if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) {
+			log.Trace().Msg("Reset failed")
+			return
+		}
 	}
 	if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) {
 		log.Trace().Msg("Set application filter failed")
@@ -250,8 +253,8 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
 	}()
 }
 
-func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext) (ok bool) {
-	log.Debug().Msg("Send a reset")
+func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
+	log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
 	requestTypeReset := readWriteModel.RequestType_RESET
 	requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeReset, requestTypeReset, &requestTypeReset, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
 	cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
@@ -259,14 +262,28 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 	receivedResetEchoChan := make(chan bool)
 	receivedResetEchoErrorChan := make(chan error)
 	if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
-		cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
-		if !ok {
-			return false
+		switch message := message.(type) {
+		case readWriteModel.CBusMessageToClientExactly:
+			if reply, ok := message.GetReply().(readWriteModel.ReplyOrConfirmationReplyExactly); ok {
+				_, ok := reply.GetReply().(readWriteModel.PowerUpReplyExactly)
+				return ok
+			}
+		case readWriteModel.CBusMessageToServerExactly:
+			_, ok = message.GetRequest().(readWriteModel.RequestResetExactly)
+			return ok
 		}
-		_, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
-		return ok
+		return false
 	}, func(message spi.Message) error {
-		receivedResetEchoChan <- true
+		switch message.(type) {
+		case readWriteModel.CBusMessageToClientExactly:
+			// This is the powerup notification
+			go func() { receivedResetEchoChan <- false }()
+		case readWriteModel.CBusMessageToServerExactly:
+			// This is the echo
+			go func() { receivedResetEchoChan <- true }()
+		default:
+			return errors.Errorf("Unmapped type %T", message)
+		}
 		return nil
 	}, func(err error) error {
 		// If this is a timeout, do a check if the connection requires a reconnection
@@ -277,7 +294,11 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 		receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
 		return nil
 	}, c.GetTtl()); err != nil {
-		c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
+		if sendOutErrorNotification {
+			c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
+		} else {
+			log.Warn().Err(err).Msg("connect failed")
+		}
 		return false
 	}
 
@@ -286,10 +307,18 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
 	case <-receivedResetEchoChan:
 		log.Debug().Msgf("We received the echo")
 	case err := <-receivedResetEchoErrorChan:
-		c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
+		if sendOutErrorNotification {
+			c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
+		} else {
+			log.Trace().Err(err).Msg("connect failed")
+		}
 		return false
-	case timeout := <-time.After(time.Second * 2):
-		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
+	case timeout := <-time.After(time.Millisecond * 500):
+		if sendOutErrorNotification {
+			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
+		} else {
+			log.Trace().Msg("timeout")
+		}
 		return false
 	}
 	log.Debug().Msg("Reset done")
diff --git a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
index ab42558ad..58dfc0b55 100644
--- a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
+++ b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
@@ -344,6 +344,7 @@ public class RandomPackagesTest {
     @Disabled("Not clear yet what this is")
     @Test
     void closestFitIsAStatusRequestButWeDonTHaveAnyBytesBeforeThat() throws Exception {
+        // TODO: this seem to be BinaryStateDeprecated for all applications
         byte[] bytes = "FAFF00r\r".getBytes(StandardCharsets.UTF_8);
         ReadBufferByteBased readBufferByteBased = new ReadBufferByteBased(bytes);
         CBusMessage msg = CBusMessage.staticParse(readBufferByteBased, false, requestContext, cBusOptions);