You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/03/26 13:37:16 UTC

qpid-broker-j git commit: QPID-8089: [Broker-J][HTTP Management] Schedule the connector shutdown once the endpoints have closed - avoids sporadic test fail

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master a7e105751 -> 2c7ec1509


QPID-8089: [Broker-J][HTTP Management] Schedule the connector shutdown once the endpoints have closed - avoids sporadic test fail


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2c7ec150
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2c7ec150
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2c7ec150

Branch: refs/heads/master
Commit: 2c7ec1509a3d3080d14a038dd6a983bf0928136d
Parents: a7e1057
Author: Keith Wall <kw...@apache.org>
Authored: Mon Mar 26 14:34:51 2018 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Mar 26 14:34:51 2018 +0100

----------------------------------------------------------------------
 .../management/plugin/HttpManagement.java       | 103 ++++++++++++++-----
 1 file changed, 79 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c7ec150/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index b263c69..f1b781d 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -37,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -51,6 +51,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.eclipse.jetty.io.Connection;
 import org.eclipse.jetty.io.ssl.SslHandshakeListener;
 import org.eclipse.jetty.server.ConnectionFactory;
 import org.eclipse.jetty.server.HttpConfiguration;
@@ -74,6 +76,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
 import org.apache.qpid.server.logging.messages.PortMessages;
 import org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter;
@@ -299,6 +302,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
         for (HttpPort<?> port : ports)
         {
             ServerConnector connector = createConnector(port, server);
+            connector.addBean(new ConnectionTrackingListener());
             server.addConnector(connector);
             _portConnectorMap.put(port, connector);
             lastPort = port.getPort();
@@ -898,6 +902,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
                     try
                     {
                         connector = createConnector(port, server);
+                        connector.addBean(new ConnectionTrackingListener());
                         server.addConnector(connector);
                         connector.start();
                         _portConnectorMap.put(port, connector);
@@ -924,37 +929,87 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
                 Server server = _server;
                 if (server != null)
                 {
-                    ServerConnector connector = _portConnectorMap.remove(port);
+                    final ServerConnector connector = _portConnectorMap.remove(port);
                     if (connector != null)
                     {
-                        int localPort = connector.getLocalPort();
-                        try
+                        final int localPort = connector.getLocalPort();
+
+                        final ConnectionTrackingListener tracker = connector.getBean(ConnectionTrackingListener.class);
+                        // closes the server socket - we will see no new connections arriving
+                        connector.close();
+                        // minimise the timeout of endpoints so they close in a timely fashion
+                        connector.setIdleTimeout(1);
+                        connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1));
+                        LOGGER.debug("Connector has {} connection(s)", tracker.getConnectionCount());
+
+                        final TaskExecutor taskExecutor = getBroker().getTaskExecutor();
+                        tracker.getAllClosedFuture().addListener(new Runnable()
                         {
-                            connector.close();
-                        }
-                        catch (Exception e)
-                        {
-                            LOGGER.warn("Failed to close connector for http port {}", port, e);
-                        }
-                        getBroker().scheduleTask(0, TimeUnit.SECONDS, () -> {
-                            LOGGER.debug("Stopping connector for http port {}", localPort);
-                            try
-                            {
-                                connector.stop();
-                            }
-                            catch (Exception e)
-                            {
-                                LOGGER.warn("Failed to stop connector for http port {}", localPort, e);
-                            }
-                            finally
+                            @Override
+                            public void run()
                             {
-                                logOperationalShutdownMessage(localPort);
-                                _server.removeConnector(connector);
+                                final int connectionCount = tracker.getConnectionCount();
+                                if (connectionCount == 0)
+                                {
+                                    LOGGER.debug("Stopping connector for http port {}", localPort);
+                                    try
+                                    {
+                                        connector.stop();
+                                    }
+                                    catch (Exception e)
+                                    {
+                                        LOGGER.warn("Failed to stop connector for http port {}", localPort, e);
+                                    }
+                                    finally
+                                    {
+                                        logOperationalShutdownMessage(localPort);
+                                        _server.removeConnector(connector);
+                                    }
+                                }
+                                else
+                                {
+                                    LOGGER.debug("Connector still has {} connection(s)", tracker.getConnectionCount());
+                                    connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1));
+                                    tracker.getAllClosedFuture()
+                                           .addListener(this, taskExecutor);
+                                }
                             }
-                        });
+                        }, taskExecutor);
                     }
                 }
             }
         }
+
+    }
+
+    private static class ConnectionTrackingListener implements Connection.Listener
+    {
+        private final Map<Connection, SettableFuture<Void>> _closeFutures = new HashMap<>();
+
+        @Override
+        public void onOpened(final Connection connection)
+        {
+            _closeFutures.put(connection, SettableFuture.create());
+        }
+
+        @Override
+        public void onClosed(final Connection connection)
+        {
+            SettableFuture<Void> closeFuture = _closeFutures.remove(connection);
+            if (closeFuture != null)
+            {
+                closeFuture.set(null);
+            }
+        }
+
+        public ListenableFuture<List<Void>> getAllClosedFuture()
+        {
+            return Futures.allAsList(_closeFutures.values());
+        }
+
+        public int getConnectionCount()
+        {
+            return _closeFutures.size();
+        }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org