You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/02 16:09:58 UTC
[plc4x] branch develop updated: fix(plc4go/cbus): display MMI's with a default incoming message handler for now
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git
The following commit(s) were added to refs/heads/develop by this push:
new a5af69e83 fix(plc4go/cbus): display MMI's with a default incoming message handler for now
a5af69e83 is described below
commit a5af69e83bc8bee346a976af24d37d6b2c1b7394
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Aug 2 18:09:50 2022 +0200
fix(plc4go/cbus): display MMI's with a default incoming message handler for now
---
plc4go/internal/bacnetip/Connection.go | 2 ++
plc4go/internal/cbus/Connection.go | 23 ++++++++++++++++++++--
plc4go/internal/spi/transports/tcp/Transport.go | 3 ++-
.../server/cbus/protocol/CBusServerAdapter.java | 3 ++-
4 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 93fd24a9a..09a04fe18 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -28,6 +28,7 @@ import (
apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
"github.com/rs/zerolog/log"
"sync"
+ "time"
)
type Connection struct {
@@ -84,6 +85,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
case message := <-incomingMessageChannel:
// TODO: implement mapping to subscribers
log.Info().Msgf("Received \n%v", message)
+ case <-time.After(20 * time.Millisecond):
}
}
log.Info().Msg("Ending incoming message transfer")
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index e9375d18c..df533e654 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -217,6 +217,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
return
}
+ startTime := time.Now()
select {
case <-receivedResetEchoChan:
log.Debug().Msgf("We received the echo")
@@ -224,7 +225,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
return
case timeout := <-time.After(time.Second * 2):
- c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
+ c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return
}
log.Debug().Msg("Reset done")
@@ -279,6 +280,23 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
}
}()
log.Debug().Msg("Subscription handler stated")
+
+ log.Debug().Msg("Starting default incoming message handler")
+ go func() {
+ for c.IsConnected() {
+ log.Debug().Msg("Polling data")
+ incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
+ select {
+ case message := <-incomingMessageChannel:
+ // TODO: forward that to the subscriber...
+ // TODO: implement mapping to subscribers
+ log.Info().Msgf("Received \n%v", message)
+ case <-time.After(20 * time.Millisecond):
+ }
+ }
+ log.Info().Msg("Ending default incoming message handler")
+ }()
+ log.Debug().Msg("default incoming message handler started")
}
func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
@@ -348,6 +366,7 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
return false
}
+ startTime := time.Now()
select {
case <-directCommandAckChan:
log.Debug().Msgf("We received the ack")
@@ -355,7 +374,7 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
return false
case timeout := <-time.After(time.Second * 2):
- c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
+ c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
return false
}
return true
diff --git a/plc4go/internal/spi/transports/tcp/Transport.go b/plc4go/internal/spi/transports/tcp/Transport.go
index 10cbe6cc7..da5585db8 100644
--- a/plc4go/internal/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/spi/transports/tcp/Transport.go
@@ -21,6 +21,7 @@ package tcp
import (
"bufio"
+ "fmt"
"github.com/apache/plc4x/plc4go/internal/spi/transports"
"github.com/apache/plc4x/plc4go/internal/spi/utils"
"github.com/pkg/errors"
@@ -84,7 +85,7 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
}
// Potentially resolve the ip address, if a hostname was provided
- tcpAddr, err := net.ResolveTCPAddr("tcp", address+":"+strconv.Itoa(port))
+ tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", address, port))
if err != nil {
return nil, errors.Wrap(err, "error resolving typ address")
}
diff --git a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
index 74bc2198a..9e277b357 100644
--- a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
+++ b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
@@ -84,7 +84,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
if (!smart && !connect) {
// In this mode every message will be echoed
LOGGER.info("Sending echo");
- ctx.writeAndFlush(msg);
+ ctx.write(msg);
}
try {
writeLock.lock();
@@ -338,6 +338,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
return;
}
} finally {
+ ctx.flush();
writeLock.unlock();
}
}