You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/03/26 13:37:16 UTC
qpid-broker-j git commit: QPID-8089: [Broker-J][HTTP Management]
Schedule the connector shutdown once the endpoints have closed - avoids
sporadic test fail
Repository: qpid-broker-j
Updated Branches:
refs/heads/master a7e105751 -> 2c7ec1509
QPID-8089: [Broker-J][HTTP Management] Schedule the connector shutdown once the endpoints have closed - avoids sporadic test fail
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2c7ec150
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2c7ec150
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2c7ec150
Branch: refs/heads/master
Commit: 2c7ec1509a3d3080d14a038dd6a983bf0928136d
Parents: a7e1057
Author: Keith Wall <kw...@apache.org>
Authored: Mon Mar 26 14:34:51 2018 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Mar 26 14:34:51 2018 +0100
----------------------------------------------------------------------
.../management/plugin/HttpManagement.java | 103 ++++++++++++++-----
1 file changed, 79 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c7ec150/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index b263c69..f1b781d 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -29,6 +29,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -37,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
@@ -51,6 +51,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ssl.SslHandshakeListener;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
@@ -74,6 +76,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.messages.ManagementConsoleMessages;
import org.apache.qpid.server.logging.messages.PortMessages;
import org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter;
@@ -299,6 +302,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
for (HttpPort<?> port : ports)
{
ServerConnector connector = createConnector(port, server);
+ connector.addBean(new ConnectionTrackingListener());
server.addConnector(connector);
_portConnectorMap.put(port, connector);
lastPort = port.getPort();
@@ -898,6 +902,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
try
{
connector = createConnector(port, server);
+ connector.addBean(new ConnectionTrackingListener());
server.addConnector(connector);
connector.start();
_portConnectorMap.put(port, connector);
@@ -924,37 +929,87 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem
Server server = _server;
if (server != null)
{
- ServerConnector connector = _portConnectorMap.remove(port);
+ final ServerConnector connector = _portConnectorMap.remove(port);
if (connector != null)
{
- int localPort = connector.getLocalPort();
- try
+ final int localPort = connector.getLocalPort();
+
+ final ConnectionTrackingListener tracker = connector.getBean(ConnectionTrackingListener.class);
+ // closes the server socket - we will see no new connections arriving
+ connector.close();
+ // minimise the timeout of endpoints so they close in a timely fashion
+ connector.setIdleTimeout(1);
+ connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1));
+ LOGGER.debug("Connector has {} connection(s)", tracker.getConnectionCount());
+
+ final TaskExecutor taskExecutor = getBroker().getTaskExecutor();
+ tracker.getAllClosedFuture().addListener(new Runnable()
{
- connector.close();
- }
- catch (Exception e)
- {
- LOGGER.warn("Failed to close connector for http port {}", port, e);
- }
- getBroker().scheduleTask(0, TimeUnit.SECONDS, () -> {
- LOGGER.debug("Stopping connector for http port {}", localPort);
- try
- {
- connector.stop();
- }
- catch (Exception e)
- {
- LOGGER.warn("Failed to stop connector for http port {}", localPort, e);
- }
- finally
+ @Override
+ public void run()
{
- logOperationalShutdownMessage(localPort);
- _server.removeConnector(connector);
+ final int connectionCount = tracker.getConnectionCount();
+ if (connectionCount == 0)
+ {
+ LOGGER.debug("Stopping connector for http port {}", localPort);
+ try
+ {
+ connector.stop();
+ }
+ catch (Exception e)
+ {
+ LOGGER.warn("Failed to stop connector for http port {}", localPort, e);
+ }
+ finally
+ {
+ logOperationalShutdownMessage(localPort);
+ _server.removeConnector(connector);
+ }
+ }
+ else
+ {
+ LOGGER.debug("Connector still has {} connection(s)", tracker.getConnectionCount());
+ connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1));
+ tracker.getAllClosedFuture()
+ .addListener(this, taskExecutor);
+ }
}
- });
+ }, taskExecutor);
}
}
}
}
+
+ }
+
+ private static class ConnectionTrackingListener implements Connection.Listener
+ {
+ private final Map<Connection, SettableFuture<Void>> _closeFutures = new HashMap<>();
+
+ @Override
+ public void onOpened(final Connection connection)
+ {
+ _closeFutures.put(connection, SettableFuture.create());
+ }
+
+ @Override
+ public void onClosed(final Connection connection)
+ {
+ SettableFuture<Void> closeFuture = _closeFutures.remove(connection);
+ if (closeFuture != null)
+ {
+ closeFuture.set(null);
+ }
+ }
+
+ public ListenableFuture<List<Void>> getAllClosedFuture()
+ {
+ return Futures.allAsList(_closeFutures.values());
+ }
+
+ public int getConnectionCount()
+ {
+ return _closeFutures.size();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org