You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by hu...@apache.org on 2021/01/10 11:59:27 UTC
[plc4x] 01/02: Added some close logic
This is an automated email from the ASF dual-hosted git repository.
hutcheb pushed a commit to branch feature/native_opua_client
in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 9ddb5d3c192ce3b1fa894caa2f699f7d751e2b9a
Author: hutcheb <be...@gmail.com>
AuthorDate: Sat Jan 9 21:21:57 2021 -0500
Added some close logic
---
.../apache/plc4x/java/opcua/OpcuaPlcDriver.java | 4 +++-
.../java/opcua/protocol/OpcuaProtocolLogic.java | 27 +++++++--------------
.../apache/plc4x/java/spi/Plc4xNettyWrapper.java | 2 ++
.../spi/connection/DefaultNettyPlcConnection.java | 28 ++++++++++++----------
.../java/spi/events/CloseConnectionEvent.java | 4 ++++
.../spi/internal/DefaultExpectRequestContext.java | 1 +
6 files changed, 34 insertions(+), 32 deletions(-)
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
index efb16b3..2875e8b 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/OpcuaPlcDriver.java
@@ -233,8 +233,10 @@ public class OpcuaPlcDriver extends GeneratedDriverBase<OpcuaAPU> {
/** Estimate the Length of a Packet */
public static class ByteLengthEstimator implements ToIntFunction<ByteBuf> {
@Override
- public int applyAsInt(ByteBuf byteBuf) {
+ public int applyAsInt(ByteBuf byteBuf) {
+ System.out.println("Estimate:- ");
if (byteBuf.readableBytes() >= 8) {
+ System.out.println("Estimate:- " + Integer.reverseBytes(byteBuf.getInt(byteBuf.readerIndex() + 4)));
return Integer.reverseBytes(byteBuf.getInt(byteBuf.readerIndex() + 4));
}
return -1;
diff --git a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
index f05e906..b8220da 100644
--- a/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
+++ b/plc4j/drivers/opcua/src/main/java/org/apache/plc4x/java/opcua/protocol/OpcuaProtocolLogic.java
@@ -126,12 +126,12 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
}
@Override
- public void onDisconnect(ConversationContext<OpcuaAPU> context) {
-
+ public void close(ConversationContext<OpcuaAPU> context) {
+ //Nothing
}
@Override
- public void close(ConversationContext<OpcuaAPU> context) {
+ public void onDisconnect(ConversationContext<OpcuaAPU> context) {
int transactionId = transactionIdentifierGenerator.getAndIncrement();
if(transactionIdentifierGenerator.get() == 0xFFFF) {
transactionIdentifierGenerator.set(1);
@@ -168,22 +168,14 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
transactionId,
closeSessionRequest);
- CompletableFuture<Boolean> voidCompletableFuture = new CompletableFuture<>();
-
context.sendRequest(new OpcuaAPU(messageRequest))
.expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
.check(p -> p.getMessage() instanceof OpcuaMessageResponse)
.unwrap(p -> (OpcuaMessageResponse) p.getMessage())
.handle(opcuaMessageResponse -> {
- LOGGER.info("Got Close Session Response Connection Response");
- voidCompletableFuture.complete(true);
- });
-
- try {
- voidCompletableFuture.get(REQUEST_TIMEOUT_LONG, TimeUnit.MILLISECONDS);
- } catch (Exception e) {
- LOGGER.debug("Timeout while waiting for session to close");
- }
+ LOGGER.info("Got Close Session Response Connection Response" + opcuaMessageResponse.toString());
+ context.fireDisconnected();
+ });
}
@Override
@@ -261,6 +253,8 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
transactionId,
openSecureChannelRequest);
+
+
context.sendRequest(new OpcuaAPU(openRequest))
.expectResponse(OpcuaAPU.class, REQUEST_TIMEOUT)
.check(p -> p.getMessage() instanceof OpcuaOpenResponse)
@@ -661,11 +655,6 @@ public class OpcuaProtocolLogic extends Plc4xProtocolBase<OpcuaAPU> implements H
return future;
}
- @Override
- protected void decode(ConversationContext<OpcuaAPU> context, OpcuaAPU msg) throws Exception {
- LOGGER.info("Error while reading value from OPC UA server error code 11");
- super.decode(context, msg);
- }
private long getCurrentDateTime() {
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
index ea5c894..9aba597 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/Plc4xNettyWrapper.java
@@ -98,6 +98,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
@Override
public ExpectRequestContext<T> expectRequest(Class<T> clazz, Duration timeout) {
+ System.out.println("Testing BH ---");
return new DefaultExpectRequestContext<>(handler -> {
logger.trace("Adding Request Handler ...");
registeredHandlers.add(handler);
@@ -235,6 +236,7 @@ public class Plc4xNettyWrapper<T> extends MessageToMessageCodec<T, Object> {
@Override
public SendRequestContext<T1> sendRequest(T1 packet) {
+ System.out.println("Testing BH ---&&");
return new DefaultSendRequestContext<>(handler -> {
logger.trace("Adding Response Handler ...");
registeredHandlers.add(handler);
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index 3a718a6..1fc70ad 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@ -18,21 +18,14 @@
*/
package org.apache.plc4x.java.spi.connection;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelPipeline;
+import io.netty.channel.*;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
import org.apache.plc4x.java.api.exceptions.PlcIoException;
import org.apache.plc4x.java.spi.configuration.Configuration;
import org.apache.plc4x.java.spi.configuration.ConfigurationFactory;
-import org.apache.plc4x.java.spi.events.CloseConnectionEvent;
-import org.apache.plc4x.java.spi.events.ConnectEvent;
-import org.apache.plc4x.java.spi.events.ConnectedEvent;
+import org.apache.plc4x.java.spi.events.*;
import org.apache.plc4x.java.spi.optimizer.BaseOptimizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +33,7 @@ import org.apache.plc4x.java.api.value.PlcValueHandler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
public class DefaultNettyPlcConnection extends AbstractPlcConnection implements ChannelExposingConnection {
@@ -54,6 +48,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
protected final ChannelFactory channelFactory;
protected final boolean awaitSessionSetupComplete;
protected final ProtocolStackConfigurer stackConfigurer;
+ private final CompletableFuture<Void> sessionDisconnectCompleteFuture = new CompletableFuture<>();
protected Channel channel;
protected boolean connected;
@@ -87,7 +82,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
ConfigurationFactory.configure(configuration, channelFactory);
// Have the channel factory create a new channel instance.
- channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture));
+ channel = channelFactory.createChannel(getChannelHandler(sessionSetupCompleteFuture, sessionDisconnectCompleteFuture));
channel.closeFuture().addListener(future -> {
if (!sessionSetupCompleteFuture.isDone()) {
sessionSetupCompleteFuture.completeExceptionally(
@@ -115,8 +110,15 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
@Override
public void close() throws PlcConnectionException {
// TODO call protocols close method
+
+ channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
+ try {
+ sessionDisconnectCompleteFuture.get(10000L, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ //Do Nothing
+ }
channel.pipeline().fireUserEventTriggered(new CloseConnectionEvent());
- // Close channel
+
channel.close().awaitUninterruptibly();
channel = null;
connected = false;
@@ -136,7 +138,7 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
return channel;
}
- public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture) {
+ public ChannelHandler getChannelHandler(CompletableFuture<Void> sessionSetupCompleteFuture, CompletableFuture<Void> sessionDisconnectCompleteFuture) {
if (stackConfigurer == null) {
throw new IllegalStateException("No Protocol Stack Configurer is given!");
}
@@ -153,6 +155,8 @@ public class DefaultNettyPlcConnection extends AbstractPlcConnection implements
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof ConnectedEvent) {
sessionSetupCompleteFuture.complete(null);
+ } else if (evt instanceof DisconnectedEvent) {
+ sessionDisconnectCompleteFuture.complete(null);
} else {
super.userEventTriggered(ctx, evt);
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
index 6cb3404..912dcd1 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/events/CloseConnectionEvent.java
@@ -18,6 +18,10 @@
*/
package org.apache.plc4x.java.spi.events;
+import io.netty.util.concurrent.CompleteFuture;
+
+import java.util.concurrent.CompletableFuture;
+
/** Signals the Protocol to Close the Connection */
public class CloseConnectionEvent {
}
diff --git a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
index 64c54f6..8b43f39 100644
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/internal/DefaultExpectRequestContext.java
@@ -51,6 +51,7 @@ public class DefaultExpectRequestContext<T> implements ConversationContext.Expec
private HandlerRegistration registration;
public DefaultExpectRequestContext(Consumer<HandlerRegistration> finisher, Class<T> expectClazz, Duration timeout, ConversationContext context) {
+ System.out.println("Testing BH");
this.finisher = finisher;
this.expectClazz = expectClazz;
this.timeout = timeout;