You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by jf...@apache.org on 2019/08/04 18:18:42 UTC

[plc4x] branch too-many-open-files updated: PLC4X-139 found the problem.

This is an automated email from the ASF dual-hosted git repository.

jfeinauer pushed a commit to branch too-many-open-files
in repository https://gitbox.apache.org/repos/asf/plc4x.git


The following commit(s) were added to refs/heads/too-many-open-files by this push:
     new d38dfb6  PLC4X-139 found the problem.
d38dfb6 is described below

commit d38dfb625d212a50a0002fcfbe94dd8e68428536
Author: Julian Feinauer <j....@pragmaticminds.de>
AuthorDate: Sun Aug 4 20:18:24 2019 +0200

    PLC4X-139 found the problem.
---
 .../plc4x/java/examples/helloplc4x/HelloPlc4x.java |  3 ++-
 .../base/connection/TcpSocketChannelFactory.java   | 25 +++++++++++++++++++---
 2 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
index 8759322..58c33fb 100644
--- a/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
+++ b/plc4j/examples/hello-world-plc4x/src/main/java/org/apache/plc4x/java/examples/helloplc4x/HelloPlc4x.java
@@ -41,7 +41,8 @@ public class HelloPlc4x {
      * @param args ignored.
      */
     public static void main(String[] args) throws Exception {
-        IntStream.range(0, 1000).parallel().forEach(i -> createConnection(i));
+        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
+        IntStream.range(0, 100000).parallel().forEach(i -> createConnection(i));
     }
 
     private static void createConnection(int i) {
diff --git a/plc4j/protocols/driver-bases/tcp/src/main/java/org/apache/plc4x/java/base/connection/TcpSocketChannelFactory.java b/plc4j/protocols/driver-bases/tcp/src/main/java/org/apache/plc4x/java/base/connection/TcpSocketChannelFactory.java
index 8f3f101..ca1d5fb 100644
--- a/plc4j/protocols/driver-bases/tcp/src/main/java/org/apache/plc4x/java/base/connection/TcpSocketChannelFactory.java
+++ b/plc4j/protocols/driver-bases/tcp/src/main/java/org/apache/plc4x/java/base/connection/TcpSocketChannelFactory.java
@@ -25,8 +25,12 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
 import org.apache.plc4x.java.api.exceptions.PlcException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -34,6 +38,8 @@ import java.net.Socket;
 
 public class TcpSocketChannelFactory implements ChannelFactory {
 
+    private static final Logger logger = LoggerFactory.getLogger(TcpSocketChannelFactory.class);
+
     private static final int PING_TIMEOUT_MS = 1_000;
 
     private final InetAddress address;
@@ -49,14 +55,27 @@ public class TcpSocketChannelFactory implements ChannelFactory {
         throws PlcConnectionException {
         try {
             Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
+            final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+            bootstrap.group(workerGroup);
             bootstrap.channel(NioSocketChannel.class);
             bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
             bootstrap.option(ChannelOption.TCP_NODELAY, true);
+            // TODO we should use an explicit (configurable?) timeout here
+            // bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
             bootstrap.handler(channelHandler);
             // Start the client.
-            ChannelFuture f = bootstrap.connect(address, port).sync();
-            f.awaitUninterruptibly();
+            final ChannelFuture f = bootstrap.connect(address, port);
+            f.addListener(new GenericFutureListener<Future<? super Void>>() {
+                @Override public void operationComplete(Future<? super Void> future) throws Exception {
+                    if (!future.isSuccess()) {
+                        logger.info("Unable to connect, shutting down worker thread.");
+                        workerGroup.shutdownGracefully();
+                    }
+                }
+            });
+            // Wait for sync
+            f.sync();
+            f.awaitUninterruptibly(); // jf: unsure if we need that
             // Wait till the session is finished initializing.
             return f.channel();
         } catch (InterruptedException e) {