You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2021/12/02 19:32:07 UTC

[plc4x] 01/02: Merge remote-tracking branch 'origin/bugfix/close-eventloop-after-channel' into develop

This is an automated email from the ASF dual-hosted git repository.

sruehl pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit ec3ff0cdfda097d3ae6e9f15f041e4c5e9c05815
Merge: c56e8bb 5691692
Author: Sebastian Rühl <sr...@apache.org>
AuthorDate: Thu Dec 2 20:25:54 2021 +0100

    Merge remote-tracking branch 'origin/bugfix/close-eventloop-after-channel' into develop
    
    # Conflicts:
    #	plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/ChannelFactory.java
    #	plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
    #	plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java

 .../plc4x/java/spi/connection/ChannelFactory.java    |  5 +++++
 .../spi/connection/DefaultNettyPlcConnection.java    |  3 +++
 .../java/spi/connection/NettyChannelFactory.java     | 20 ++++++++++++++++++++
 3 files changed, 28 insertions(+)

diff --cc plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
index 68bcf3a,117728b..a5f783e
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/DefaultNettyPlcConnection.java
@@@ -143,29 -111,31 +143,32 @@@ public class DefaultNettyPlcConnection 
          }
      }
  
 -    /*@Override
 -    public CompletableFuture<Void> ping() {
 -        CompletableFuture<Void> future = new CompletableFuture<>();
 -        try {
 -            // Relay the actual pinging to the channel factory ...
 -            channelFactory.ping();
 -            // If we got here, the ping was successful.
 -            future.complete(null);
 -        } catch (PlcException e) {
 -            // If we got here, something went wrong.
 -            future.completeExceptionally(e);
 -        }
 -        return future;
 -    }*/
 -
 +    /**
 +     * Close the connection by firstly calling disconnect and waiting for a DisconnectedEvent occurs and then calling
 +     * Close() to finish up any other clean up.
 +     * @throws PlcConnectionException
 +     */
      @Override
      public void close() throws PlcConnectionException {
 -        // TODO call protocols close method
 +        logger.debug("Closing connection to PLC, await for disconnect = {}", awaitSessionDisconnectComplete);
 +        channel.pipeline().fireUserEventTriggered(new DisconnectEvent());
 +        try {
 +            if (awaitSessionDisconnectComplete) {
 +                sessionDisconnectCompleteFuture.get(DEFAULT_DISCONNECT_WAIT_TIME, TimeUnit.MILLISECONDS);
 +            }
 +        } catch (Exception e) {
 +            logger.error("Timeout while trying to close connection");
 +        }
          channel.pipeline().fireUserEventTriggered(new CloseConnectionEvent());
 -        // Close channel
          channel.close().awaitUninterruptibly();
  
 +        if (!sessionDisconnectCompleteFuture.isDone()) {
 +            sessionDisconnectCompleteFuture.complete(null);
 +        }
 +
+         // Shutdown the Worker Group
+         channelFactory.closeEventLoopForChannel(channel);
+ 
          channel = null;
          connected = false;
      }
diff --cc plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
index 28c563a,823fcd1..dfe4989
--- a/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
+++ b/plc4j/spi/src/main/java/org/apache/plc4x/java/spi/connection/NettyChannelFactory.java
@@@ -29,6 -30,9 +29,8 @@@ import org.slf4j.Logger
  import org.slf4j.LoggerFactory;
  
  import java.net.SocketAddress;
+ import java.util.Map;
 -import java.util.Properties;
+ import java.util.concurrent.ConcurrentHashMap;
  
  /**
   * Adapter with sensible defaults for a Netty Based Channel Factory.
@@@ -39,7 -43,10 +41,9 @@@
  public abstract class NettyChannelFactory implements ChannelFactory {
  
      private static final Logger logger = LoggerFactory.getLogger(NettyChannelFactory.class);
 -    private static final int PING_TIMEOUT_MS = 1_000;
  
+     private final Map<Channel, EventLoopGroup> eventLoops = new ConcurrentHashMap<>();
+ 
      /**
       * TODO should be removed together with the Constructor.
       */
@@@ -131,4 -141,55 +138,17 @@@
          }
      }
  
+     @Override
+     public void closeEventLoopForChannel(Channel channel) {
+         if (eventLoops.containsKey(channel)) {
+             logger.info("Channel is closed, closing worker Group also");
+             EventLoopGroup eventExecutors = eventLoops.get(channel);
+             eventLoops.remove(channel);
+             eventExecutors.shutdownGracefully().awaitUninterruptibly();
+             logger.info("Worker Group was closed successfully!");
+         } else {
+             logger.warn("Trying to remove EventLoop for Channel {} but have none stored", channel);
+         }
+     }
+ 
 -    // TODO do we want to keep this like that?
 -    /*@Override
 -    public void ping() throws PlcException {
 -        // TODO: Replace this check with a more accurate one ...
 -        InetSocketAddress address = new InetSocketAddress(getAddress(), getPort());
 -        try (Socket s = new Socket()) {
 -            s.connect(address, PING_TIMEOUT_MS);
 -            // TODO keep the address for a (timely) next request???
 -            s.setReuseAddress(true);
 -        } catch (Exception e) {
 -            throw new PlcConnectionException("Unable to ping remote host");
 -        }
 -    }
 -
 -    public Properties getProperties() {
 -        // Null Safety for older implementations
 -        if (properties == null) {
 -            return new Properties();
 -        }
 -        return properties;
 -    }
 -
 -    public void setProperties(Properties properties) {
 -        this.properties = properties;
 -    }
 -
 -    protected String getProperty(String key) {
 -        return ((String) getProperties().get(key));
 -    }
 -
 -    protected boolean hasProperty(String key) {
 -        return getProperties().contains(key);
 -    }
 -
 -    protected String getPropertyOrDefault(String key, String defaultValue) {
 -        return getProperties().getProperty(key, defaultValue);
 -    }*/
 -
  }