You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/18 20:28:23 UTC

[activemq-artemis] 01/03: ARTEMIS-2408 Too many opened FDs after server stops

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit aeecb6f70b90c58e7f972b9f123dbe85d9ddf0c1
Author: brusdev <br...@gmail.com>
AuthorDate: Thu Jul 18 19:43:08 2019 +0200

    ARTEMIS-2408 Too many opened FDs after server stops
    
    Limit acceptors stop time.
    
    reapplying after commit 20ca827d79282ca41b4f60f28e6b09333c3d82d5
---
 .../artemis/core/server/ActiveMQComponent.java     | 16 +++-----------
 .../core/remoting/impl/netty/NettyAcceptor.java    | 25 +++++++++++-----------
 .../remoting/server/impl/RemotingServiceImpl.java  |  7 +++++-
 .../core/server/impl/ActiveMQServerImpl.java       |  2 ++
 .../artemis/tests/util/ActiveMQTestBase.java       |  4 ++++
 5 files changed, 28 insertions(+), 26 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQComponent.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQComponent.java
index dd60404..2b5c6c7 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQComponent.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQComponent.java
@@ -16,26 +16,16 @@
  */
 package org.apache.activemq.artemis.core.server;
 
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-
 public interface ActiveMQComponent {
 
    void start() throws Exception;
 
    void stop() throws Exception;
 
-   default Future<?> asyncStop() {
-      CompletableFuture<?> future = new CompletableFuture<>();
-
-      try {
-         stop();
-         future.complete(null);
-      } catch (Throwable t) {
-         future.completeExceptionally(t);
-      }
+   default void asyncStop(Runnable callback) throws Exception {
+      stop();
 
-      return future;
+      callback.run();
    }
 
    boolean isStarted();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 98fb676..ee8c44e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -36,8 +36,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -222,8 +222,6 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    final Executor failureExecutor;
 
-   private Future<?> asyncStopFuture = null;
-
    public NettyAcceptor(final String name,
                         final ClusterConnection clusterConnection,
                         final Map<String, Object> configuration,
@@ -649,15 +647,18 @@ public class NettyAcceptor extends AbstractAcceptor {
    }
 
    @Override
-   public java.util.concurrent.Future<?> asyncStop() {
-      stop();
+   public void stop() throws Exception {
+      CountDownLatch latch = new CountDownLatch(1);
+
+      asyncStop(() -> latch.countDown());
 
-      return asyncStopFuture;
+      latch.await();
    }
 
    @Override
-   public synchronized void stop() {
+   public synchronized void asyncStop(Runnable callback) {
       if (channelClazz == null) {
+         callback.run();
          return;
       }
 
@@ -693,11 +694,6 @@ public class NettyAcceptor extends AbstractAcceptor {
          }
       }
 
-      // Shutdown the EventLoopGroup if no new task was added for 100ms or if
-      // 3000ms elapsed.
-      asyncStopFuture = eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS);
-      eventLoopGroup = null;
-
       channelClazz = null;
 
       for (Connection connection : connections.values()) {
@@ -720,6 +716,11 @@ public class NettyAcceptor extends AbstractAcceptor {
       }
 
       paused = false;
+
+      // Shutdown the EventLoopGroup if no new task was added for 100ms or if
+      // 3000ms elapsed.
+      eventLoopGroup.shutdownGracefully(100, 3000, TimeUnit.MILLISECONDS).addListener(f -> callback.run());
+      eventLoopGroup = null;
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 416e9a9..24dc727 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -79,6 +79,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
 
    private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class);
 
+   private static final int ACCEPTOR_STOP_TIMEOUT = 3000;
+
    // Attributes ----------------------------------------------------
 
    private volatile boolean started = false;
@@ -407,13 +409,16 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
          conn.disconnect(criticalError);
       }
 
+      CountDownLatch acceptorCountDownLatch = new CountDownLatch(acceptors.size());
       for (Acceptor acceptor : acceptors.values()) {
          try {
-            acceptor.stop();
+            acceptor.asyncStop(() -> acceptorCountDownLatch.countDown());
          } catch (Throwable t) {
             ActiveMQServerLogger.LOGGER.errorStoppingAcceptor(acceptor.getName());
          }
       }
+      //In some cases an acceptor stopping could be locked ie NettyAcceptor stopping could be locked by a network failure.
+      acceptorCountDownLatch.await(ACCEPTOR_STOP_TIMEOUT, TimeUnit.MILLISECONDS);
 
       acceptors.clear();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 68ca620..24be17c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1269,6 +1269,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          this.analyzer = null;
       }
 
+      activateCallbacks.clear();
+
       if (identity != null) {
          ActiveMQServerLogger.LOGGER.serverStopped("identity=" + identity + ",version=" + getVersion().getFullVersion(), tempNodeID, getUptime());
       } else {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index b48af77..19a3919 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -171,6 +171,9 @@ public abstract class ActiveMQTestBase extends Assert {
    @ClassRule
    public static ThreadLeakCheckRule leakCheckRule = new ThreadLeakCheckRule();
 
+   @Rule
+   public NoProcessFilesBehind noProcessFilesBehind = new NoProcessFilesBehind(-1, 1000);
+
    /** We should not under any circunstance create data outside of ./target
     *  if you have a test failing because because of this rule for any reason,
     *  even if you use afterClass events, move the test to ./target and always cleanup after
@@ -276,6 +279,7 @@ public abstract class ActiveMQTestBase extends Assert {
 
    @After
    public void tearDown() throws Exception {
+      noProcessFilesBehind.tearDown();
       closeAllSessionFactories();
       closeAllServerLocatorsFactories();