You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2022/08/02 16:09:58 UTC

[plc4x] branch develop updated: fix(plc4go/cbus): display MMI's with a default incoming message handler for now

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/develop by this push:
     new a5af69e83 fix(plc4go/cbus): display MMI's with a default incoming message handler for now
a5af69e83 is described below

commit a5af69e83bc8bee346a976af24d37d6b2c1b7394
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Tue Aug 2 18:09:50 2022 +0200

    fix(plc4go/cbus): display MMI's with a default incoming message handler for now
---
 plc4go/internal/bacnetip/Connection.go             |  2 ++
 plc4go/internal/cbus/Connection.go                 | 23 ++++++++++++++++++++--
 plc4go/internal/spi/transports/tcp/Transport.go    |  3 ++-
 .../server/cbus/protocol/CBusServerAdapter.java    |  3 ++-
 4 files changed, 27 insertions(+), 4 deletions(-)

diff --git a/plc4go/internal/bacnetip/Connection.go b/plc4go/internal/bacnetip/Connection.go
index 93fd24a9a..09a04fe18 100644
--- a/plc4go/internal/bacnetip/Connection.go
+++ b/plc4go/internal/bacnetip/Connection.go
@@ -28,6 +28,7 @@ import (
 	apiModel "github.com/apache/plc4x/plc4go/pkg/api/model"
 	"github.com/rs/zerolog/log"
 	"sync"
+	"time"
 )
 
 type Connection struct {
@@ -84,6 +85,7 @@ func (c *Connection) Connect() <-chan plc4go.PlcConnectionConnectResult {
 				case message := <-incomingMessageChannel:
 					// TODO: implement mapping to subscribers
 					log.Info().Msgf("Received \n%v", message)
+				case <-time.After(20 * time.Millisecond):
 				}
 			}
 			log.Info().Msg("Ending incoming message transfer")
diff --git a/plc4go/internal/cbus/Connection.go b/plc4go/internal/cbus/Connection.go
index e9375d18c..df533e654 100644
--- a/plc4go/internal/cbus/Connection.go
+++ b/plc4go/internal/cbus/Connection.go
@@ -217,6 +217,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 			return
 		}
 
+		startTime := time.Now()
 		select {
 		case <-receivedResetEchoChan:
 			log.Debug().Msgf("We received the echo")
@@ -224,7 +225,7 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 			c.fireConnectionError(errors.Wrap(err, "Error receiving of Reset"), ch)
 			return
 		case timeout := <-time.After(time.Second * 2):
-			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
+			c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 			return
 		}
 		log.Debug().Msg("Reset done")
@@ -279,6 +280,23 @@ func (c *Connection) setupConnection(ch chan plc4go.PlcConnectionConnectResult)
 		}
 	}()
 	log.Debug().Msg("Subscription handler stated")
+
+	log.Debug().Msg("Starting default incoming message handler")
+	go func() {
+		for c.IsConnected() {
+			log.Debug().Msg("Polling data")
+			incomingMessageChannel := c.messageCodec.GetDefaultIncomingMessageChannel()
+			select {
+			case message := <-incomingMessageChannel:
+				// TODO: forward that to the subscriber...
+				// TODO: implement mapping to subscribers
+				log.Info().Msgf("Received \n%v", message)
+			case <-time.After(20 * time.Millisecond):
+			}
+		}
+		log.Info().Msg("Ending default incoming message handler")
+	}()
+	log.Debug().Msg("default incoming message handler started")
 }
 
 func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult, paramNo readWriteModel.Parameter, parameterValue readWriteModel.ParameterValue, requestContext *readWriteModel.RequestContext, cbusOptions *readWriteModel.CBusOptions) bool {
@@ -348,6 +366,7 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
 		return false
 	}
 
+	startTime := time.Now()
 	select {
 	case <-directCommandAckChan:
 		log.Debug().Msgf("We received the ack")
@@ -355,7 +374,7 @@ func (c *Connection) sendCalDataWrite(ch chan plc4go.PlcConnectionConnectResult,
 		c.fireConnectionError(errors.Wrap(err, "Error receiving of ack"), ch)
 		return false
 	case timeout := <-time.After(time.Second * 2):
-		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout), ch)
+		c.fireConnectionError(errors.Errorf("Timeout after %v", timeout.Sub(startTime)), ch)
 		return false
 	}
 	return true
diff --git a/plc4go/internal/spi/transports/tcp/Transport.go b/plc4go/internal/spi/transports/tcp/Transport.go
index 10cbe6cc7..da5585db8 100644
--- a/plc4go/internal/spi/transports/tcp/Transport.go
+++ b/plc4go/internal/spi/transports/tcp/Transport.go
@@ -21,6 +21,7 @@ package tcp
 
 import (
 	"bufio"
+	"fmt"
 	"github.com/apache/plc4x/plc4go/internal/spi/transports"
 	"github.com/apache/plc4x/plc4go/internal/spi/utils"
 	"github.com/pkg/errors"
@@ -84,7 +85,7 @@ func (m Transport) CreateTransportInstance(transportUrl url.URL, options map[str
 	}
 
 	// Potentially resolve the ip address, if a hostname was provided
-	tcpAddr, err := net.ResolveTCPAddr("tcp", address+":"+strconv.Itoa(port))
+	tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", address, port))
 	if err != nil {
 		return nil, errors.Wrap(err, "error resolving typ address")
 	}
diff --git a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
index 74bc2198a..9e277b357 100644
--- a/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
+++ b/sandbox/plc-simulator/src/main/java/org/apache/plc4x/simulator/server/cbus/protocol/CBusServerAdapter.java
@@ -84,7 +84,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
         if (!smart && !connect) {
             // In this mode every message will be echoed
             LOGGER.info("Sending echo");
-            ctx.writeAndFlush(msg);
+            ctx.write(msg);
         }
         try {
             writeLock.lock();
@@ -338,6 +338,7 @@ public class CBusServerAdapter extends ChannelInboundHandlerAdapter {
                 return;
             }
         } finally {
+            ctx.flush();
             writeLock.unlock();
         }
     }