You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/17 09:13:10 UTC
[plc4x] 03/03: fix(plc4go/cbus): fixed issue when connection doesn't work on a power up notification
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit e514fe10e5fb3cda2a32587b90ceaf5221f02436
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Wed Aug 17 11:13:00 2022 +0200
fix(plc4go/cbus): fixed issue when connection doesn't work on a power up notification
---
plc4go/internal/cbus/Connection.go | 61 ++++++++++++++++------
.../apache/plc4x/java/cbus/RandomPackagesTest.java | 1 +
2 files changed, 46 insertions(+), 16 deletions(-)
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index de040eede..16315f92b 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -181,10 +181,13 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
cbusOptions := &c.messageCodec.(*MessageCodec).cbusOptions
requestContext := &c.messageCodec.(*MessageCodec).requestContext
- // TODO: Sometimes we get a power up and we need a second reset
- if !c.sendReset(ctx, ch, cbusOptions, requestContext) {
- log.Trace().Msg("Reset failed")
- return
+ if !c.sendReset(ctx, ch, cbusOptions, requestContext, false) {
+ log.Warn().Msg("First reset failed")
+ // We try a second reset in case we get a power up
+ if !c.sendReset(ctx, ch, cbusOptions, requestContext, true) {
+ log.Trace().Msg("Reset failed")
+ return
+ }
}
if !c.setApplicationFilter(ctx, ch, requestContext, cbusOptions) {
log.Trace().Msg("Set application filter failed")
@@ -250,8 +253,8 @@ func (c *Connection) setupConnection(ctx context.Context, ch chan plc4go.PlcConn
}()
}
-func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext) (ok bool) {
- log.Debug().Msg("Send a reset")
+func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnectionConnectResult, cbusOptions *readWriteModel.CBusOptions, requestContext *readWriteModel.RequestContext, sendOutErrorNotification bool) (ok bool) {
+ log.Debug().Msgf("Send a reset (sendOutErrorNotification: %t)", sendOutErrorNotification)
requestTypeReset := readWriteModel.RequestType_RESET
requestReset := readWriteModel.NewRequestReset(requestTypeReset, &requestTypeReset, requestTypeReset, &requestTypeReset, requestTypeReset, nil, &requestTypeReset, requestTypeReset, readWriteModel.NewRequestTermination(), *cbusOptions)
cBusMessage := readWriteModel.NewCBusMessageToServer(requestReset, *requestContext, *cbusOptions)
@@ -259,14 +262,28 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
receivedResetEchoChan := make(chan bool)
receivedResetEchoErrorChan := make(chan error)
if err := c.messageCodec.SendRequest(ctx, cBusMessage, func(message spi.Message) bool {
- cbusMessageToServer, ok := message.(readWriteModel.CBusMessageToServerExactly)
- if !ok {
- return false
+ switch message := message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ if reply, ok := message.GetReply().(readWriteModel.ReplyOrConfirmationReplyExactly); ok {
+ _, ok := reply.GetReply().(readWriteModel.PowerUpReplyExactly)
+ return ok
+ }
+ case readWriteModel.CBusMessageToServerExactly:
+ _, ok = message.GetRequest().(readWriteModel.RequestResetExactly)
+ return ok
}
- _, ok = cbusMessageToServer.GetRequest().(readWriteModel.RequestResetExactly)
- return ok
+ return false
}, func(message spi.Message) error {
- receivedResetEchoChan <- true
+ switch message.(type) {
+ case readWriteModel.CBusMessageToClientExactly:
+ // This is the powerup notification
+ go func() { receivedResetEchoChan <- false }()
+ case readWriteModel.CBusMessageToServerExactly:
+ // This is the echo
+ go func() { receivedResetEchoChan <- true }()
+ default:
+ return errors.Errorf("Unmapped type %T", message)
+ }
return nil
}, func(err error) error {
// If this is a timeout, do a check if the connection requires a reconnection
@@ -277,7 +294,11 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
receivedResetEchoErrorChan <- errors.Wrap(err, "got error processing request")
return nil
}, c.GetTtl()); err != nil {
- c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
+ if sendOutErrorNotification {
+ c.fireConnectionError(errors.Wrap(err, "Error during sending of Reset Request"), ch)
+ } else {
+ log.Warn().Err(err).Msg("connect failed")
+ }
return false
}
@@ -286,10 +307,18 @@ func (c *Connection) sendReset(ctx context.Context, ch chan plc4go.PlcConnection
case <-receivedResetEchoChan:
log.Debug().Msgf("We received the echo")
case err := <-receivedResetEchoErrorChan:
- c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
+ if sendOutErrorNotification {
+ c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
+ } else {
+ log.Trace().Err(err).Msg("connect failed")
+ }
return false
- case timeout := <-time.After(time.Second * 2):
- c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
+ case timeout := <-time.After(time.Millisecond * 500):
+ if sendOutErrorNotification {
+ c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
+ } else {
+ log.Trace().Msg("timeout")
+ }
return false
}
log.Debug().Msg("Reset done")
diff --git a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
index ab42558ad..58dfc0b55 100644
--- a/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
+++ b/plc4j/drivers/c-bus/src/test/java/org/apache/plc4x/java/cbus/RandomPackagesTest.java
@@ -344,6 +344,7 @@ public class RandomPackagesTest {
@Disabled("Not clear yet what this is")
@Test
void closestFitIsAStatusRequestButWeDonTHaveAnyBytesBeforeThat() throws Exception {
+ // TODO: this seem to be BinaryStateDeprecated for all applications
byte[] bytes = "FAFF00r\r".getBytes(StandardCharsets.UTF_8);
ReadBufferByteBased readBufferByteBased = new ReadBufferByteBased(bytes);
CBusMessage msg = CBusMessage.staticParse(readBufferByteBased, false, requestContext, cBusOptions);