You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/13 23:08:00 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5587

Repository: activemq
Updated Branches:
  refs/heads/master 1cab71386 -> 8b36701fc


https://issues.apache.org/jira/browse/AMQ-5587

Add a connect check in the inactivity monitor to account for opened
connections that might drop but not be spotted, in the case where the
connect frame is lost this can lead to connections that aren't fully
opened and won't be cleaned up until the broker detects the socket has
failed.

By default the connection timer is set to 30 seconds, if no connect
frame is read by then the connection is dropped.  The broker can be
configured via the 'transport.connectAttemptTimeout' URI option, a value

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b36701f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b36701f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b36701f

Branch: refs/heads/master
Commit: 8b36701fc351080b5a278f7a88f62f65886c645e
Parents: 1cab713
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 13 17:07:33 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 13 17:07:33 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/AMQPSslTransportFactory.java |   7 +-
 .../transport/amqp/AmqpInactivityMonitor.java   | 159 +++++++++++++++++
 .../transport/amqp/AmqpNioSslTransport.java     |   4 -
 .../transport/amqp/AmqpNioTransportFactory.java |  11 +-
 .../transport/amqp/AmqpProtocolConverter.java   |  17 +-
 .../activemq/transport/amqp/AmqpTransport.java  |   3 +
 .../transport/amqp/AmqpTransportFactory.java    |   7 +-
 .../transport/amqp/AmqpTransportFilter.java     |  28 +++
 .../activemq/transport/amqp/AmqpWireFormat.java |  11 ++
 .../transport/amqp/AmqpConnectTimeoutTest.java  | 169 +++++++++++++++++++
 .../transport/amqp/AmqpTestSupport.java         |  12 +-
 11 files changed, 410 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
index 4d7af7e..12bd526 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPSslTransportFactory.java
@@ -66,7 +66,10 @@ public class AMQPSslTransportFactory extends SslTransportFactory implements Brok
     }
 
     @Override
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        return false;
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+        return monitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
new file mode 100644
index 0000000..065559d
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.util.Timer;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.thread.SchedulerTimerTask;
+import org.apache.activemq.transport.AbstractInactivityMonitor;
+import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.util.ThreadPoolUtils;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AmqpInactivityMonitor extends TransportFilter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpInactivityMonitor.class);
+
+    private static ThreadPoolExecutor ASYNC_TASKS;
+    private static int CHECKER_COUNTER;
+    private static Timer ACTIVITY_CHECK_TIMER;
+
+    private final AtomicBoolean failed = new AtomicBoolean(false);
+    private IAmqpProtocolConverter protocolConverter;
+
+    private long connectionTimeout = AmqpWireFormat.DEFAULT_CONNECTION_TIMEOUT;
+    private SchedulerTimerTask connectCheckerTask;
+    private final Runnable connectChecker = new Runnable() {
+
+        private final long startTime = System.currentTimeMillis();
+
+        @Override
+        public void run() {
+
+            long now = System.currentTimeMillis();
+
+            if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("No connection attempt made in time for " + AmqpInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
+                }
+                ASYNC_TASKS.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        onException(new InactivityIOException("Channel was inactive for too (>" + (connectionTimeout) + ") long: "
+                            + next.getRemoteAddress()));
+                    }
+                });
+            }
+        }
+    };
+
+    public AmqpInactivityMonitor(Transport next, WireFormat wireFormat) {
+        super(next);
+    }
+
+    @Override
+    public void start() throws Exception {
+        next.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        stopConnectChecker();
+        next.stop();
+    }
+
+    @Override
+    public void onException(IOException error) {
+        if (failed.compareAndSet(false, true)) {
+            stopConnectChecker();
+            if (protocolConverter != null) {
+                protocolConverter.onAMQPException(error);
+            }
+            transportListener.onException(error);
+        }
+    }
+
+    public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
+        this.protocolConverter = protocolConverter;
+    }
+
+    public IAmqpProtocolConverter getProtocolConverter() {
+        return protocolConverter;
+    }
+
+    synchronized void startConnectChecker(long connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+        if (connectionTimeout > 0 && connectCheckerTask == null) {
+            connectCheckerTask = new SchedulerTimerTask(connectChecker);
+
+            long connectionCheckInterval = Math.min(connectionTimeout, 1000);
+
+            synchronized (AbstractInactivityMonitor.class) {
+                if (CHECKER_COUNTER == 0) {
+                    ASYNC_TASKS = createExecutor();
+                    ACTIVITY_CHECK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
+                }
+                CHECKER_COUNTER++;
+                ACTIVITY_CHECK_TIMER.schedule(connectCheckerTask, connectionCheckInterval, connectionCheckInterval);
+            }
+        }
+    }
+
+    synchronized void stopConnectChecker() {
+        if (connectCheckerTask != null) {
+            connectCheckerTask.cancel();
+            connectCheckerTask = null;
+
+            synchronized (AbstractInactivityMonitor.class) {
+                ACTIVITY_CHECK_TIMER.purge();
+                CHECKER_COUNTER--;
+                if (CHECKER_COUNTER == 0) {
+                    ACTIVITY_CHECK_TIMER.cancel();
+                    ACTIVITY_CHECK_TIMER = null;
+                    ThreadPoolUtils.shutdown(ASYNC_TASKS);
+                    ASYNC_TASKS = null;
+                }
+            }
+        }
+    }
+
+    private final ThreadFactory factory = new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable runnable) {
+            Thread thread = new Thread(runnable, "AmqpInactivityMonitor Async Task: " + runnable);
+            thread.setDaemon(true);
+            return thread;
+        }
+    };
+
+    private ThreadPoolExecutor createExecutor() {
+        ThreadPoolExecutor exec = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), factory);
+        exec.allowCoreThreadTimeOut(true);
+        return exec;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index a722404..3276be9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -26,13 +26,9 @@ import javax.net.SocketFactory;
 
 import org.apache.activemq.transport.nio.NIOSSLTransport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class AmqpNioSslTransport extends NIOSSLTransport {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AmqpNioSslTransport.class);
-
     private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
 
     public AmqpNioSslTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
index b017937..bbcac47 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -86,13 +86,16 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok
     }
 
     @Override
-    public void setBrokerService(BrokerService brokerService) {
-        this.brokerService = brokerService;
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+        return monitor;
     }
 
     @Override
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        return false;
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 131df8f..425b264 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -141,6 +141,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
     public AmqpProtocolConverter(AmqpTransport transport, BrokerService brokerService) {
         this.amqpTransport = transport;
+        AmqpInactivityMonitor monitor = transport.getInactivityMonitor();
+        if (monitor != null) {
+            monitor.setProtocolConverter(this);
+        }
         this.amqpWireFormat = transport.getWireFormat();
         this.brokerService = brokerService;
 
@@ -513,7 +517,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         connectionInfo.setResponseRequired(true);
         connectionInfo.setConnectionId(connectionId);
-        // configureInactivityMonitor(connect.keepAlive());
+
+        configureInactivityMonitor();
 
         String clientId = protonConnection.getRemoteContainer();
         if (clientId != null && !clientId.isEmpty()) {
@@ -578,6 +583,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
+    private void configureInactivityMonitor() {
+        AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor();
+        if (monitor == null) {
+            return;
+        }
+
+        monitor.stopConnectChecker();
+    }
+
     InboundTransformer inboundTransformer;
 
     protected InboundTransformer getInboundTransformer() {
@@ -648,7 +662,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private final ActiveMQDestination destination;
         private boolean closed;
         private final boolean anonymous;
-        private MessageId lastDispatched;
 
         public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
             this.producerId = producerId;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
index 58b776f..698d7b7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java
@@ -50,4 +50,7 @@ public interface AmqpTransport {
 
     public void setProtocolConverter(IAmqpProtocolConverter protocolConverter);
 
+    public void setInactivityMonitor(AmqpInactivityMonitor monitor);
+
+    public AmqpInactivityMonitor getInactivityMonitor();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
index 3ca8ea1..6dce2c0 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFactory.java
@@ -66,7 +66,10 @@ public class AmqpTransportFactory extends TcpTransportFactory implements BrokerS
     }
 
     @Override
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        return false;
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+        AmqpTransportFilter filter = transport.narrow(AmqpTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+        return monitor;
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 5fb7a04..3e361c2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -44,6 +44,7 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpTransportFilter.class.getPackage().getName() + ".FRAMES");
     private IAmqpProtocolConverter protocolConverter;
     private AmqpWireFormat wireFormat;
+    private AmqpInactivityMonitor monitor;
 
     private boolean trace;
     private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
@@ -58,6 +59,15 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     }
 
     @Override
+    public void start() throws Exception {
+        if (monitor != null) {
+            monitor.setProtocolConverter(protocolConverter);
+            monitor.startConnectChecker(getConnectAttemptTimeout());
+        }
+        super.start();
+    }
+
+    @Override
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
@@ -184,4 +194,22 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setProducerCredit(int producerCredit) {
         protocolConverter.setProducerCredit(producerCredit);
     }
+
+    @Override
+    public void setInactivityMonitor(AmqpInactivityMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public AmqpInactivityMonitor getInactivityMonitor() {
+        return monitor;
+    }
+
+    public long getConnectAttemptTimeout() {
+        return wireFormat.getConnectAttemptTimeout();
+    }
+
+    public void setConnectAttemptTimeout(long connectAttemptTimeout) {
+        wireFormat.setConnectAttemptTimeout(connectAttemptTimeout);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 0fd6140..dc0e3d5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -36,11 +36,14 @@ public class AmqpWireFormat implements WireFormat {
 
     public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
     public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
+    public static final long DEFAULT_CONNECTION_TIMEOUT = 30000L;
+
     private static final int SASL_PROTOCOL = 3;
 
     private int version = 1;
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
+    private long connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
 
     private boolean magicRead = false;
     private ResetListener resetListener;
@@ -196,4 +199,12 @@ public class AmqpWireFormat implements WireFormat {
     public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
         this.allowNonSaslConnections = allowNonSaslConnections;
     }
+
+    public long getConnectAttemptTimeout() {
+        return connectAttemptTimeout;
+    }
+
+    public void setConnectAttemptTimeout(long connectAttemptTimeout) {
+        this.connectAttemptTimeout = connectAttemptTimeout;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
new file mode 100644
index 0000000..cd9a8bc
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that connection attempts that don't send the connect performative
+ * get cleaned up by the inactivity monitor.
+ */
+@RunWith(Parameterized.class)
+public class AmqpConnectTimeoutTest extends AmqpTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectTimeoutTest.class);
+
+    private Socket connection;
+    protected boolean useSSL;
+    protected String connectorScheme;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {"amqp", false},
+                {"amqp+ssl", true},
+                {"amqp+nio", false},
+                {"amqp+nio+ssl", true}
+            });
+    }
+
+    public AmqpConnectTimeoutTest(String connectorScheme, boolean useSSL) {
+        this.connectorScheme = connectorScheme;
+        this.useSSL = useSSL;
+    }
+
+    protected String getConnectorScheme() {
+        return connectorScheme;
+    }
+
+    protected boolean isUseSSL() {
+        return useSSL;
+    }
+
+    @Override
+    protected boolean isUseSslConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioConnector() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUseNioPlusSslConnector() {
+        return true;
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Throwable e) {}
+            connection = null;
+        }
+        super.tearDown();
+    }
+
+    @Override
+    public String getAdditionalConfig() {
+        return "&transport.connectAttemptTimeout=2000";
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testInactivityMonitor() throws Exception {
+
+        Thread t1 = new Thread() {
+
+            @Override
+            public void run() {
+                try {
+                    connection = createConnection();
+                    connection.getOutputStream().write('A');
+                    connection.getOutputStream().flush();
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.start();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+             @Override
+             public boolean isSatisified() throws Exception {
+                 return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+             }
+         }));
+
+        // and it should be closed due to inactivity
+        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+            }
+        }));
+
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    protected Socket createConnection() throws IOException {
+
+        int port = 0;
+        switch (connectorScheme) {
+            case "amqp":
+                port = this.port;
+                break;
+            case "amqp+ssl":
+                port = this.sslPort;
+                break;
+            case "amqp+nio":
+                port = this.nioPort;
+                break;
+            case "amqp+nio+ssl":
+                port = this.nioPlusSslPort;
+                break;
+            default:
+                throw new IOException("Invalid AMQP connector scheme passed to test.");
+        }
+
+        if (isUseSSL()) {
+            return SSLSocketFactory.getDefault().createSocket("localhost", port);
+        } else {
+            return new Socket("localhost", port);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b36701f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 23a964d..c7866b7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -134,25 +134,25 @@ public class AmqpTestSupport {
         }
         if (isUseTcpConnector()) {
             connector = brokerService.addConnector(
-                "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer());
+                "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
             port = connector.getConnectUri().getPort();
             LOG.debug("Using amqp port " + port);
         }
         if (isUseSslConnector()) {
             connector = brokerService.addConnector(
-                "amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer());
+                "amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
             sslPort = connector.getConnectUri().getPort();
             LOG.debug("Using amqp+ssl port " + sslPort);
         }
         if (isUseNioConnector()) {
             connector = brokerService.addConnector(
-                "amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer());
+                "amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
             nioPort = connector.getConnectUri().getPort();
             LOG.debug("Using amqp+nio port " + nioPort);
         }
         if (isUseNioPlusSslConnector()) {
             connector = brokerService.addConnector(
-                "amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer());
+                "amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
             nioPlusSslPort = connector.getConnectUri().getPort();
             LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort);
         }
@@ -182,6 +182,10 @@ public class AmqpTestSupport {
         return "jms";
     }
 
+    protected String getAdditionalConfig() {
+        return "";
+    }
+
     public void startBroker() throws Exception {
         if (brokerService != null) {
             throw new IllegalStateException("Broker is already created.");