You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2021/01/10 11:59:27 UTC

[plc4x] 01/02: Added some close logic

This is an automated email from the ASF dual-hosted git repository.

hutcheb pushed a commit to branch feature/native_opua_client
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit 9ddb5d3c192ce3b1fa894caa2f699f7d751e2b9a
Author: hutcheb <be...@gmail.com>
AuthorDate: Sat Jan 9 21:21:57 2021 -0500

    Added some close logic
---
 .../apache/plc4x/java/opcua/OpcuaPlcDriver.java    |  4 +++-
 .../java/opcua/protocol/OpcuaProtocolLogic.java    | 27 +++++++--------------
 .../apache/plc4x/java/spi/Plc4xNettyWrapper.java   |  2 ++
 .../spi/connection/DefaultNettyPlcConnection.java  | 28 ++++++++++++----------
 .../java/spi/events/CloseConnectionEvent.java      |  4 ++++
 .../spi/internal/DefaultExpectRequestContext.java  |  1 +
 6 files changed, 34 insertions(+), 32 deletions(-)

diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
index efb16b3..2875e8b 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
@@ -233,8 +233,10 @@ public class OpcuaPlcDriver extends GeneratedDriverBase<OpcuaAPU> {
     /** Estimate the Length of a Packet */
     public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
         @Override
-        public int applyAsInt(ByteBuf byteBuf) {            
+        public int applyAsInt(ByteBuf byteBuf) {
+            System.out.println("Estimate:- ");
             if (byteBuf.readableBytes() >= 8) {
+                System.out.println("Estimate:- " + Integer.reverseBytes(byteBuf.getInt(byteBuf.readerIndex() + 4)));
                 return Integer.reverseBytes(byteBuf.getInt(byteBuf.readerIndex() + 4));
             }
             return -1;
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index f05e906..b8220da 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -126,12 +126,12 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
     }
 
     @Override
-    public void onDisconnect(ConversationContext<OpcuaAPU> context) {
-
+    public void close(ConversationContext<OpcuaAPU> context) {
+        //Nothing
     }
 
     @Override
-    public void close(ConversationContext<OpcuaAPU> context) {
+    public void onDisconnect(ConversationContext<OpcuaAPU> context) {
         int transactionId = transactionIdentifierGenerator.getAndIncrement();
         if(transactionIdentifierGenerator.get() == 0xFFFF) {
             transactionIdentifierGenerator.set(1);
@@ -168,22 +168,14 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
             transactionId,
             closeSessionRequest);
 
-        CompletableFuture<Boolean> voidCompletableFuture = new CompletableFuture<>();
-
         context.sendRequest(new OpcuaAPU(messageRequest))
             .expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
             .check(p -> p.getMessage() instanceof OpcuaMessageResponse)
             .unwrap(p -> (OpcuaMessageResponse) p.getMessage())
             .handle(opcuaMessageResponse -> {
-                LOGGER.info("Got Close Session Response Connection Response");
-                voidCompletableFuture.complete(true);
-            });
-
-        try {
-            voidCompletableFuture.get(REQUEST_TIMEOUT_LONG, TimeUnit.MILLISECONDS);
-        } catch (Exception e) {
-            LOGGER.debug("Timeout while waiting for session to close");
-        }
+                    LOGGER.info("Got Close Session Response Connection Response" + opcuaMessageResponse.toString());
+                    context.fireDisconnected();
+                });
     }
 
     @Override
@@ -261,6 +253,8 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
             transactionId,
             openSecureChannelRequest);
 
+
+
         context.sendRequest(new OpcuaAPU(openRequest))
             .expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
             .check(p -> p.getMessage() instanceof OpcuaOpenResponse)
@@ -661,11 +655,6 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
         return future;
     }
 
-    @Override
-    protected void decode(ConversationContext<OpcuaAPU> context, OpcuaAPU msg) throws Exception {
-        LOGGER.info("Error while reading value from OPC UA server error code 11");
-        super.decode(context, msg);
-    }
 
 
     private long getCurrentDateTime() {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index ea5c894..9aba597 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -98,6 +98,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
 
             @Override
             public ExpectRequestContext<T> expectRequest(Class<T> clazz, Duration timeout) {
+                System.out.println("Testing BH ---");
                 return new DefaultExpectRequestContext<>(handler -> {
                     logger.trace("Adding Request Handler ...");
                     registeredHandlers.add(handler);
@@ -235,6 +236,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
 
         @Override
         public SendRequestContext<T1> sendRequest(T1 packet) {
+            System.out.println("Testing BH ---&&");
             return new DefaultSendRequestContext<>(handler -> {
                 logger.trace("Adding Response Handler ...");
                 registeredHandlers.add(handler);
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index 3a718a6..1fc70ad 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@ -18,21 +18,14 @@
  */
 package org.apache.plc4x.java.spi.connection;
 
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.*;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcIoException;
 import org.apache.plc4x.java.spi.configuration.Configuration;
 import org.apache.plc4x.java.spi.configuration.ConfigurationFactory;
-import org.apache.plc4x.java.spi.events.CloseConnectionEvent;
-import org.apache.plc4x.java.spi.events.ConnectEvent;
-import org.apache.plc4x.java.spi.events.ConnectedEvent;
+import org.apache.plc4x.java.spi.events.*;
 import org.apache.plc4x.java.spi.optimizer.BaseOptimizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +33,7 @@ import org.apache.plc4x.java.api.value.PlcValueHandler;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 public class DefaultNettyPlcConnection extends AbstractPlcConnection implements ChannelExposingConnection {
 
@@ -54,6 +48,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
     protected final ChannelFactory channelFactory;
     protected final boolean awaitSessionSetupComplete;
     protected final ProtocolStackConfigurer stackConfigurer;
+    private final CompletableFuture<Void> sessionDisconnectCompleteFuture = new CompletableFuture<>();
 
     protected Channel channel;
     protected boolean connected;
@@ -87,7 +82,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
             ConfigurationFactory.configure(configuration, channelFactory);
 
             // Have the channel factory create a new channel instance.
-            channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture));
+            channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture));
             channel.closeFuture().addListener(future -> {
                 if (!sessionSetupCompleteFuture.isDone()) {
                     sessionSetupCompleteFuture.completeExceptionally(
@@ -115,8 +110,15 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
     @Override
     public void close() throws PlcConnectionException {
         // TODO call protocols close method
+
+        channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
+        try {
+            sessionDisconnectCompleteFuture.get(10000L, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            //Do Nothing
+        }
         channel.pipeline().fireUserEventTriggered(new CloseConnectionEvent());
-        // Close channel
+
         channel.close().awaitUninterruptibly();
         channel = null;
         connected = false;
@@ -136,7 +138,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
         return channel;
     }
 
-    public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
+    public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture, CompletableFuture<Void> sessionDisconnectCompleteFuture) {
         if (stackConfigurer == null) {
             throw new IllegalStateException("No Protocol Stack Configurer is given!");
         }
@@ -153,6 +155,8 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
                     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                         if (evt instanceof ConnectedEvent) {
                             sessionSetupCompleteFuture.complete(null);
+                        } else if (evt instanceof DisconnectedEvent) {
+                            sessionDisconnectCompleteFuture.complete(null);
                         } else {
                             super.userEventTriggered(ctx, evt);
                         }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
index 6cb3404..912dcd1 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
@@ -18,6 +18,10 @@
  */
 package org.apache.plc4x.java.spi.events;
 
+import io.netty.util.concurrent.CompleteFuture;
+
+import java.util.concurrent.CompletableFuture;
+
 /** Signals the Protocol to Close the Connection */
 public class CloseConnectionEvent {
 }
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
index 64c54f6..8b43f39 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
@@ -51,6 +51,7 @@ public class DefaultExpectRequestContext<T> implements ConversationContext.Expec
     private HandlerRegistration registration;
 
     public DefaultExpectRequestContext(Consumer<HandlerRegistration> finisher, Class<T> expectClazz, Duration timeout, ConversationContext context) {
+        System.out.println("Testing BH");
         this.finisher = finisher;
         this.expectClazz = expectClazz;
         this.timeout = timeout;