You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/26 18:30:13 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5794
Repository: activemq
Updated Branches:
refs/heads/master ea2746b12 -> 3125caee5
https://issues.apache.org/jira/browse/AMQ-5794
implement transport.connectAttemptTimeout option across the transports
fro OpenWire, STOMP, AMQP and MQTT and add tests to cover.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3125caee
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3125caee
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3125caee
Branch: refs/heads/master
Commit: 3125caee5b6136233e05a58369b31c1a15abc389
Parents: ea2746b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue May 26 12:29:23 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue May 26 12:29:23 2015 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpInactivityMonitor.java | 63 +++--
.../transport/AbstractInactivityMonitor.java | 112 +++++++-
.../activemq/transport/InactivityMonitor.java | 9 +
.../transport/ws/AbstractStompSocket.java | 1 +
.../ws/StompWSConnectionTimeoutTest.java | 82 ++++++
.../transport/ws/WSTransportTestSupport.java | 3 +
.../transport/mqtt/MQTTInactivityMonitor.java | 57 +++--
.../transport/stomp/StompInactivityMonitor.java | 4 +-
.../transport/stomp/StompTransportFilter.java | 22 +-
.../transport/stomp/StompWireFormat.java | 33 ++-
.../stomp/StompConnectTimeoutTest.java | 172 +++++++++++++
.../src/test/resources/log4j.properties | 2 +-
.../openwire/OpenWireConnectionTimeoutTest.java | 255 +++++++++++++++++++
13 files changed, 743 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
index 8e6f60d..e7255ea 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpInactivityMonitor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.util.Timer;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -58,15 +59,22 @@ public class AmqpInactivityMonitor extends TransportFilter {
public void run() {
long now = System.currentTimeMillis();
- if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
+ if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AmqpInactivityMonitor.this.toString());
- ASYNC_TASKS.execute(new Runnable() {
- @Override
- public void run() {
- onException(new InactivityIOException(
- "Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress()));
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ @Override
+ public void run() {
+ onException(new InactivityIOException(
+ "Channel was inactive for too (>" + (connectionTimeout) + ") long: " + next.getRemoteAddress()));
+ }
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isShutdown()) {
+ LOG.error("Async connection timeout task was rejected from the executor: ", ex);
+ throw ex;
}
- });
+ }
}
}
};
@@ -76,26 +84,33 @@ public class AmqpInactivityMonitor extends TransportFilter {
@Override
public void run() {
- if (keepAliveTask != null && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
- ASYNC_TASKS.execute(new Runnable() {
- @Override
- public void run() {
- try {
- long nextIdleUpdate = amqpTransport.keepAlive();
- if (nextIdleUpdate > 0) {
- synchronized (AmqpInactivityMonitor.this) {
- if (keepAliveTask != null) {
- keepAliveTask = new SchedulerTimerTask(keepAlive);
- KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate);
+ if (keepAliveTask != null && !ASYNC_TASKS.isShutdown()) {
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long nextIdleUpdate = amqpTransport.keepAlive();
+ if (nextIdleUpdate > 0) {
+ synchronized (AmqpInactivityMonitor.this) {
+ if (keepAliveTask != null) {
+ keepAliveTask = new SchedulerTimerTask(keepAlive);
+ KEEPALIVE_TASK_TIMER.schedule(keepAliveTask, nextIdleUpdate);
+ }
}
}
+ } catch (Exception ex) {
+ onException(new InactivityIOException(
+ "Exception while performing idle checks for connection: " + next.getRemoteAddress()));
}
- } catch (Exception ex) {
- onException(new InactivityIOException(
- "Exception while performing idle checks for connection: " + next.getRemoteAddress()));
}
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isShutdown()) {
+ LOG.error("Async connection timeout task was rejected from the executor: ", ex);
+ throw ex;
}
- });
+ }
}
}
};
@@ -144,7 +159,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
if (CONNECTION_CHECK_TASK_COUNTER == 0) {
- if (ASYNC_TASKS == null) {
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
ASYNC_TASKS = createExecutor();
}
CONNECTION_CHECK_TASK_TIMER = new Timer("AMQP InactivityMonitor State Check", true);
@@ -167,7 +182,7 @@ public class AmqpInactivityMonitor extends TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
if (KEEPALIVE_TASK_COUNTER == 0) {
- if (ASYNC_TASKS == null) {
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
ASYNC_TASKS = createExecutor();
}
KEEPALIVE_TASK_TIMER = new Timer("AMQP InactivityMonitor Idle Update", true);
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index 8fbf623..7cc9205 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.SchedulerTimerTask;
-import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +42,10 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
private static final Logger LOG = LoggerFactory.getLogger(AbstractInactivityMonitor.class);
+ private static final long DEFAULT_CHECK_TIME_MILLS = 30000;
+
private static ThreadPoolExecutor ASYNC_TASKS;
private static int CHECKER_COUNTER;
- private static long DEFAULT_CHECK_TIME_MILLS = 30000;
private static Timer READ_CHECK_TIMER;
private static Timer WRITE_CHECK_TIMER;
@@ -61,9 +61,11 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
private final ReentrantReadWriteLock sendLock = new ReentrantReadWriteLock();
+ private SchedulerTimerTask connectCheckerTask;
private SchedulerTimerTask writeCheckerTask;
private SchedulerTimerTask readCheckerTask;
+ private long connectAttemptTimeout = DEFAULT_CHECK_TIME_MILLS;
private long readCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long writeCheckTime = DEFAULT_CHECK_TIME_MILLS;
private long initialDelayTime = DEFAULT_CHECK_TIME_MILLS;
@@ -72,6 +74,34 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
protected WireFormat wireFormat;
+ private final Runnable connectChecker = new Runnable() {
+
+ private final long startTime = System.currentTimeMillis();
+
+ @Override
+ public void run() {
+ long now = System.currentTimeMillis();
+
+ if ((now - startTime) >= connectAttemptTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
+ LOG.debug("No connection attempt made in time for {}! Throwing InactivityIOException.", AbstractInactivityMonitor.this.toString());
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ @Override
+ public void run() {
+ onException(new InactivityIOException(
+ "Channel was inactive for too (>" + (connectAttemptTimeout) + ") long: " + next.getRemoteAddress()));
+ }
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isShutdown()) {
+ LOG.error("Async connection timeout task was rejected from the executor: ", ex);
+ throw ex;
+ }
+ }
+ }
+ }
+ };
+
private final Runnable readChecker = new Runnable() {
long lastRunTime;
@@ -151,7 +181,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
return;
}
- if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ if (!commandSent.get() && useKeepAlive && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
LOG.trace("{} no message sent since last write check, sending a KeepAliveInfo", this);
try {
@@ -185,7 +215,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
};
});
} catch (RejectedExecutionException ex) {
- if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ if (!ASYNC_TASKS.isShutdown()) {
LOG.error("Async write check was rejected from the executor: ", ex);
throw ex;
}
@@ -204,7 +234,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
LOG.trace("A receive is in progress, skipping read check.");
return;
}
- if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ if (!commandReceived.get() && monitorStarted.get() && !ASYNC_TASKS.isShutdown()) {
LOG.debug("No message received since last read check for {}. Throwing InactivityIOException.", this);
try {
@@ -221,7 +251,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
};
});
} catch (RejectedExecutionException ex) {
- if (!ASYNC_TASKS.isTerminating() && !ASYNC_TASKS.isTerminated()) {
+ if (!ASYNC_TASKS.isShutdown()) {
LOG.error("Async read check was rejected from the executor: ", ex);
throw ex;
}
@@ -280,14 +310,14 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
// are performing a send we take a read lock. The inactivity monitor
// sends its Heart-beat commands under a write lock. This means that
// the MutexTransport is still responsible for synchronizing sends
- this.sendLock.readLock().lock();
+ sendLock.readLock().lock();
inSend.set(true);
try {
doOnewaySend(o);
} finally {
commandSent.set(true);
inSend.set(false);
- this.sendLock.readLock().unlock();
+ sendLock.readLock().unlock();
}
}
@@ -319,6 +349,14 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
useKeepAlive = val;
}
+ public long getConnectAttemptTimeout() {
+ return connectAttemptTimeout;
+ }
+
+ public void setConnectAttemptTimeout(long connectionTimeout) {
+ this.connectAttemptTimeout = connectionTimeout;
+ }
+
public long getReadCheckTime() {
return readCheckTime;
}
@@ -355,6 +393,52 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
return this.monitorStarted.get();
}
+ abstract protected boolean configuredOk() throws IOException;
+
+ public synchronized void startConnectCheckTask() {
+ startConnectCheckTask(getConnectAttemptTimeout());
+ }
+
+ public synchronized void startConnectCheckTask(long connectionTimeout) {
+ if (connectionTimeout <= 0) {
+ return;
+ }
+
+ LOG.info("Starting connection check task for: {}", this);
+
+ this.connectAttemptTimeout = connectionTimeout;
+
+ if (connectCheckerTask == null) {
+ connectCheckerTask = new SchedulerTimerTask(connectChecker);
+
+ synchronized (AbstractInactivityMonitor.class) {
+ if (CHECKER_COUNTER == 0) {
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
+ ASYNC_TASKS = createExecutor();
+ }
+ if (READ_CHECK_TIMER == null) {
+ READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
+ }
+ }
+ CHECKER_COUNTER++;
+ READ_CHECK_TIMER.schedule(connectCheckerTask, connectionTimeout);
+ }
+ }
+ }
+
+ public synchronized void stopConnectCheckTask() {
+ if (connectCheckerTask != null) {
+ LOG.info("Stopping connection check task for: {}", this);
+ connectCheckerTask.cancel();
+ connectCheckerTask = null;
+
+ synchronized (AbstractInactivityMonitor.class) {
+ READ_CHECK_TIMER.purge();
+ CHECKER_COUNTER--;
+ }
+ }
+ }
+
protected synchronized void startMonitorThreads() throws IOException {
if (monitorStarted.get()) {
return;
@@ -375,11 +459,16 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
if (writeCheckTime > 0 || readCheckTime > 0) {
monitorStarted.set(true);
synchronized (AbstractInactivityMonitor.class) {
- if (CHECKER_COUNTER == 0) {
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
ASYNC_TASKS = createExecutor();
+ }
+ if (READ_CHECK_TIMER == null) {
READ_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor ReadCheckTimer", true);
+ }
+ if (WRITE_CHECK_TIMER == null) {
WRITE_CHECK_TIMER = new Timer("ActiveMQ InactivityMonitor WriteCheckTimer", true);
}
+
CHECKER_COUNTER++;
if (readCheckTime > 0) {
READ_CHECK_TIMER.schedule(readCheckerTask, initialDelayTime, readCheckTime);
@@ -391,9 +480,8 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
}
}
- abstract protected boolean configuredOk() throws IOException;
-
protected synchronized void stopMonitorThreads() {
+ stopConnectCheckTask();
if (monitorStarted.compareAndSet(true, false)) {
if (readCheckerTask != null) {
readCheckerTask.cancel();
@@ -401,6 +489,7 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
if (writeCheckerTask != null) {
writeCheckerTask.cancel();
}
+
synchronized (AbstractInactivityMonitor.class) {
WRITE_CHECK_TIMER.purge();
READ_CHECK_TIMER.purge();
@@ -410,7 +499,6 @@ public abstract class AbstractInactivityMonitor extends TransportFilter {
READ_CHECK_TIMER.cancel();
WRITE_CHECK_TIMER = null;
READ_CHECK_TIMER = null;
- ThreadPoolUtils.shutdown(ASYNC_TASKS);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java b/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
index 8b312ee..288ec61 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
@@ -44,7 +44,15 @@ public class InactivityMonitor extends AbstractInactivityMonitor {
}
}
+ @Override
+ public void start() throws Exception {
+ startConnectCheckTask();
+ super.start();
+ }
+
+ @Override
protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
+ stopConnectCheckTask();
IOException error = null;
remoteWireFormatInfo = info;
try {
@@ -57,6 +65,7 @@ public class InactivityMonitor extends AbstractInactivityMonitor {
}
}
+ @Override
protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
localWireFormatInfo = info;
startMonitorThreads();
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
index 472561a..739e2fc 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -70,6 +70,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
protected void doStart() throws Exception {
socketTransportStarted.countDown();
stompInactivityMonitor.setTransportListener(getTransportListener());
+ stompInactivityMonitor.startConnectCheckTask();
}
//----- Abstract methods for subclasses to implement ---------------------//
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
new file mode 100644
index 0000000..b0ca372
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnectionTimeoutTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.util.Wait;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test that a STOMP WS connection drops if not CONNECT or STOMP frame sent in time.
+ */
+public class StompWSConnectionTimeoutTest extends WSTransportTestSupport {
+
+ protected WebSocketClient wsClient;
+ protected StompWSConnection wsStompConnection;
+
+ protected Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ WebSocketClientFactory clientFactory = new WebSocketClientFactory();
+ clientFactory.start();
+
+ wsClient = clientFactory.newWebSocketClient();
+ wsStompConnection = new StompWSConnection();
+
+ wsClient.open(wsConnectUri, wsStompConnection);
+ if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
+ throw new IOException("Could not connect to STOMP WS endpoint");
+ }
+ }
+
+ protected String getConnectorScheme() {
+ return "ws";
+ }
+
+ @Test(timeout = 90000)
+ public void testInactivityMonitor() throws Exception {
+
+ assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == broker.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+
+ // and it should be closed due to inactivity
+ assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 0 == broker.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(60), TimeUnit.MILLISECONDS.toMillis(500)));
+
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
index 9c4abc8..6ab86ff 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -54,6 +54,7 @@ public class WSTransportTestSupport {
@Before
public void setUp() throws Exception {
+ LOG.info("========== Starting test: {} ==========", name.getMethodName());
broker = createBroker(true);
}
@@ -64,6 +65,8 @@ public class WSTransportTestSupport {
} catch(Exception e) {
LOG.warn("Error on Broker stop.");
}
+
+ LOG.info("========== Finished test: {} ==========", name.getMethodName());
}
protected String getWSConnectorURI() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
index 28b6926..aaad323 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTInactivityMonitor.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.mqtt;
import java.io.IOException;
import java.util.Timer;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -32,7 +33,6 @@ import org.apache.activemq.transport.AbstractInactivityMonitor;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
-import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,17 +69,25 @@ public class MQTTInactivityMonitor extends TransportFilter {
long now = System.currentTimeMillis();
- if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
+ if ((now - startTime) >= connectionTimeout && connectCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No CONNECT frame received in time for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
- ASYNC_TASKS.execute(new Runnable() {
- @Override
- public void run() {
- onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
- + next.getRemoteAddress()));
+
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ @Override
+ public void run() {
+ onException(new InactivityIOException("Channel was inactive for too (>" + (readKeepAliveTime + readGraceTime) + ") long: "
+ + next.getRemoteAddress()));
+ }
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isShutdown()) {
+ LOG.error("Async connection timeout task was rejected from the executor: ", ex);
+ throw ex;
}
- });
+ }
}
}
};
@@ -109,17 +117,24 @@ public class MQTTInactivityMonitor extends TransportFilter {
return;
}
- if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && readCheckerTask != null && !ASYNC_TASKS.isTerminating()) {
+ if ((now - lastReceiveTime) >= readKeepAliveTime + readGraceTime && readCheckerTask != null && !ASYNC_TASKS.isShutdown()) {
if (LOG.isDebugEnabled()) {
LOG.debug("No message received since last read check for " + MQTTInactivityMonitor.this.toString() + "! Throwing InactivityIOException.");
}
- ASYNC_TASKS.execute(new Runnable() {
- @Override
- public void run() {
- onException(new InactivityIOException("Channel was inactive for too (>" +
- (connectionTimeout) + ") long: " + next.getRemoteAddress()));
+ try {
+ ASYNC_TASKS.execute(new Runnable() {
+ @Override
+ public void run() {
+ onException(new InactivityIOException("Channel was inactive for too (>" +
+ (connectionTimeout) + ") long: " + next.getRemoteAddress()));
+ }
+ });
+ } catch (RejectedExecutionException ex) {
+ if (!ASYNC_TASKS.isShutdown()) {
+ LOG.error("Async connection timeout task was rejected from the executor: ", ex);
+ throw ex;
}
- });
+ }
}
}
};
@@ -215,7 +230,9 @@ public class MQTTInactivityMonitor extends TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
- ASYNC_TASKS = createExecutor();
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
+ ASYNC_TASKS = createExecutor();
+ }
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
}
CHECKER_COUNTER++;
@@ -230,7 +247,9 @@ public class MQTTInactivityMonitor extends TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
if (CHECKER_COUNTER == 0) {
- ASYNC_TASKS = createExecutor();
+ if (ASYNC_TASKS == null || ASYNC_TASKS.isShutdown()) {
+ ASYNC_TASKS = createExecutor();
+ }
READ_CHECK_TIMER = new Timer("InactivityMonitor ReadCheck", true);
}
CHECKER_COUNTER++;
@@ -250,8 +269,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
if (CHECKER_COUNTER == 0) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
- ThreadPoolUtils.shutdown(ASYNC_TASKS);
- ASYNC_TASKS = null;
}
}
}
@@ -268,8 +285,6 @@ public class MQTTInactivityMonitor extends TransportFilter {
if (CHECKER_COUNTER == 0) {
READ_CHECK_TIMER.cancel();
READ_CHECK_TIMER = null;
- ThreadPoolUtils.shutdown(ASYNC_TASKS);
- ASYNC_TASKS = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
index 8b1bc33..fa2c408 100755
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
@@ -41,7 +41,9 @@ public class StompInactivityMonitor extends AbstractInactivityMonitor {
public void startMonitoring() throws IOException {
this.isConfigured = true;
- this.startMonitorThreads();
+
+ stopConnectCheckTask();
+ startMonitorThreads();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
index 87774db..f9d780f 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
@@ -39,7 +39,9 @@ import org.slf4j.LoggerFactory;
* @author <a href="http://hiramchirino.com">chirino</a>
*/
public class StompTransportFilter extends TransportFilter implements StompTransport {
+
private static final Logger TRACE = LoggerFactory.getLogger(StompTransportFilter.class.getPackage().getName() + ".StompIO");
+
private final ProtocolConverter protocolConverter;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
@@ -56,6 +58,14 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
}
@Override
+ public void start() throws Exception {
+ if (monitor != null) {
+ monitor.startConnectCheckTask(getConnectAttemptTimeout());
+ }
+ super.start();
+ }
+
+ @Override
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
@@ -168,12 +178,20 @@ public class StompTransportFilter extends TransportFilter implements StompTransp
public int getMaxDataLength() {
return wireFormat.getMaxDataLength();
}
-
+
public void setMaxFrameSize(int maxFrameSize) {
wireFormat.setMaxFrameSize(maxFrameSize);
}
-
+
public long getMaxFrameSize() {
return wireFormat.getMaxFrameSize();
}
+
+ public long getConnectAttemptTimeout() {
+ return wireFormat.getConnectionAttemptTimeout();
+ }
+
+ public void setConnectAttemptTimeout(long timeout) {
+ wireFormat.setConnectionAttemptTimeout(timeout);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
index 25ba91b..daa4639 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
@@ -45,15 +45,18 @@ public class StompWireFormat implements WireFormat {
private static final int MAX_HEADER_LENGTH = 1024 * 10;
private static final int MAX_HEADERS = 1000;
private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
+ public static final long DEFAULT_CONNECTION_TIMEOUT = 30000;
private int version = 1;
private int maxDataLength = MAX_DATA_LENGTH;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private String stompVersion = Stomp.DEFAULT_VERSION;
-
+ private long connectionAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
+
//The current frame size as it is unmarshalled from the stream
- private AtomicLong frameSize = new AtomicLong();
+ private final AtomicLong frameSize = new AtomicLong();
@Override
public ByteSequence marshal(Object command) throws IOException {
@@ -104,7 +107,7 @@ public class StompWireFormat implements WireFormat {
public Object unmarshal(DataInput in) throws IOException {
try {
-
+
// parse action
String action = parseAction(in, frameSize);
@@ -131,7 +134,7 @@ public class StompWireFormat implements WireFormat {
// We don't know how much to read.. data ends when we hit a 0
byte b;
ByteArrayOutputStream baos = null;
- while ((b = in.readByte()) != 0) {
+ while ((b = in.readByte()) != 0) {
if (baos == null) {
baos = new ByteArrayOutputStream();
} else if (baos.size() > getMaxDataLength()) {
@@ -141,7 +144,7 @@ public class StompWireFormat implements WireFormat {
throw new ProtocolException("The maximum frame size was exceeded", true);
}
}
-
+
baos.write(b);
}
@@ -191,7 +194,7 @@ public class StompWireFormat implements WireFormat {
protected String parseAction(DataInput in, AtomicLong frameSize) throws IOException {
String action = null;
-
+
// skip white space to next real action line
while (true) {
action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
@@ -209,11 +212,11 @@ public class StompWireFormat implements WireFormat {
}
protected HashMap<String, String> parseHeaders(DataInput in, AtomicLong frameSize) throws IOException {
- HashMap<String, String> headers = new HashMap<String, String>(25);
+ HashMap<String, String> headers = new HashMap<String, String>(25);
while (true) {
ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
if (line != null && line.length > 1) {
-
+
if (headers.size() > MAX_HEADERS) {
throw new ProtocolException("The maximum number of headers was exceeded", true);
}
@@ -257,7 +260,7 @@ public class StompWireFormat implements WireFormat {
}
return headers;
}
-
+
protected int parseContentLength(String contentLength, AtomicLong frameSize) throws ProtocolException {
int length;
try {
@@ -269,7 +272,7 @@ public class StompWireFormat implements WireFormat {
if (length > getMaxDataLength()) {
throw new ProtocolException("The maximum data length was exceeded", true);
}
-
+
if (frameSize.addAndGet(length) > getMaxFrameSize()) {
throw new ProtocolException("The maximum frame size was exceeded", true);
}
@@ -341,7 +344,7 @@ public class StompWireFormat implements WireFormat {
return new String(decoded.toByteArray(), "UTF-8");
}
-
+
@Override
public int getVersion() {
return version;
@@ -375,4 +378,12 @@ public class StompWireFormat implements WireFormat {
public void setMaxFrameSize(long maxFrameSize) {
this.maxFrameSize = maxFrameSize;
}
+
+ public long getConnectionAttemptTimeout() {
+ return connectionAttemptTimeout;
+ }
+
+ public void setConnectionAttemptTimeout(long connectionAttemptTimeout) {
+ this.connectionAttemptTimeout = connectionAttemptTimeout;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompConnectTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompConnectTimeoutTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompConnectTimeoutTest.java
new file mode 100644
index 0000000..69fd4de
--- /dev/null
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompConnectTimeoutTest.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that connection attempts that don't send the connect performative
+ * get cleaned up by the inactivity monitor.
+ */
+@RunWith(Parameterized.class)
+public class StompConnectTimeoutTest extends StompTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompConnectTimeoutTest.class);
+
+ private Socket connection;
+ protected String connectorScheme;
+
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"stomp"},
+ {"stomp+ssl"},
+ {"stomp+nio"},
+ {"stomp+nio+ssl"}
+ });
+ }
+
+ public StompConnectTimeoutTest(String connectorScheme) {
+ this.connectorScheme = connectorScheme;
+ }
+
+ protected String getConnectorScheme() {
+ return connectorScheme;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Throwable e) {}
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ @Override
+ public String getAdditionalConfig() {
+ return "?transport.connectAttemptTimeout=1200";
+ }
+
+ @Test(timeout = 15000)
+ public void testInactivityMonitor() throws Exception {
+
+ Thread t1 = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ connection = createSocket();
+ connection.getOutputStream().write('S');
+ connection.getOutputStream().flush();
+ } catch (Exception ex) {
+ LOG.error("unexpected exception on connect/disconnect", ex);
+ exceptions.add(ex);
+ }
+ }
+ };
+
+ t1.start();
+
+ assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+
+ // and it should be closed due to inactivity
+ assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
+
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+
+ @Override
+ protected boolean isUseTcpConnector() {
+ return connectorScheme.equalsIgnoreCase("stomp");
+ }
+
+ @Override
+ protected boolean isUseSslConnector() {
+ return connectorScheme.equalsIgnoreCase("stomp+ssl");
+ }
+
+ @Override
+ protected boolean isUseNioConnector() {
+ return connectorScheme.equalsIgnoreCase("stomp+nio");
+ }
+
+ @Override
+ protected boolean isUseNioPlusSslConnector() {
+ return connectorScheme.equalsIgnoreCase("stomp+nio+ssl");
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+
+ boolean useSSL = false;
+ int port = 0;
+
+ switch (connectorScheme) {
+ case "stomp":
+ port = this.port;
+ break;
+ case "stomp+ssl":
+ useSSL = true;
+ port = this.sslPort;
+ break;
+ case "stomp+nio":
+ port = this.nioPort;
+ break;
+ case "stomp+nio+ssl":
+ useSSL = true;
+ port = this.nioSslPort;
+ break;
+ default:
+ throw new IOException("Invalid STOMP connector scheme passed to test.");
+ }
+
+ if (useSSL) {
+ return SSLSocketFactory.getDefault().createSocket("localhost", port);
+ } else {
+ return new Socket("localhost", port);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-stomp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/resources/log4j.properties b/activemq-stomp/src/test/resources/log4j.properties
index 7cc1941..f7c2c7f 100755
--- a/activemq-stomp/src/test/resources/log4j.properties
+++ b/activemq-stomp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
#
log4j.rootLogger=INFO, out, stdout
-#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
+log4j.logger.org.apache.activemq.transport=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE
#log4j.logger.org.apache.activemq.store.jdbc=TRACE
http://git-wip-us.apache.org/repos/asf/activemq/blob/3125caee/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/OpenWireConnectionTimeoutTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/OpenWireConnectionTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/OpenWireConnectionTimeoutTest.java
new file mode 100644
index 0000000..28e0989
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/openwire/OpenWireConnectionTimeoutTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.openwire;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.spring.SpringSslContext;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test that connection attempts that don't send the WireFormatInfo performative
+ * get cleaned up by the inactivity monitor.
+ */
+@RunWith(Parameterized.class)
+public class OpenWireConnectionTimeoutTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OpenWireConnectionTimeoutTest.class);
+
+ @Rule public TestName name = new TestName();
+
+ private Socket connection;
+ protected String connectorScheme;
+ protected int port;
+ protected BrokerService brokerService;
+ protected Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"tcp"},
+ {"ssl"},
+ {"nio"},
+ {"nio+ssl"}
+ });
+ }
+
+ public OpenWireConnectionTimeoutTest(String connectorScheme) {
+ this.connectorScheme = connectorScheme;
+ }
+
+ protected String getConnectorScheme() {
+ return connectorScheme;
+ }
+
+ public String getTestName() {
+ return name.getMethodName();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("========== start " + getTestName() + " ==========");
+
+ startBroker();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Throwable e) {}
+ connection = null;
+ }
+
+ stopBroker();
+
+ LOG.info("========== start " + getTestName() + " ==========");
+ }
+
+ public String getAdditionalConfig() {
+ return "?transport.connectAttemptTimeout=1200";
+ }
+
+ @Test(timeout = 90000)
+ public void testInactivityMonitor() throws Exception {
+
+ Thread t1 = new Thread() {
+
+ @Override
+ public void run() {
+ try {
+ connection = createConnection();
+ connection.getOutputStream().write('A');
+ connection.getOutputStream().flush();
+ } catch (Exception ex) {
+ LOG.error("unexpected exception on connect/disconnect", ex);
+ exceptions.add(ex);
+ }
+ }
+ };
+
+ t1.start();
+
+ assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+
+ // and it should be closed due to inactivity
+ assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 0 == brokerService.getTransportConnectorByScheme(getConnectorScheme()).connectionCount();
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(500)));
+
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+
+ protected Socket createConnection() throws IOException {
+ boolean useSsl = false;
+
+ switch (connectorScheme) {
+ case "tcp":
+ case "nio":
+ break;
+ case "ssl":
+ case "nio+ssl":
+ useSsl = true;;
+ break;
+ default:
+ throw new IOException("Invalid OpenWire connector scheme passed to test.");
+ }
+
+ if (useSsl) {
+ return SSLSocketFactory.getDefault().createSocket("localhost", port);
+ } else {
+ return new Socket("localhost", port);
+ }
+ }
+
+ protected void startBroker() throws Exception {
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setSchedulerSupport(false);
+ brokerService.setAdvisorySupport(false);
+ brokerService.setUseJmx(false);
+ brokerService.getManagementContext().setCreateConnector(false);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
+ SSLContext.setDefault(ctx);
+
+ // Setup SSL context...
+ final File classesDir = new File(OpenWireConnectionTimeoutTest.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+ File keystore = new File(classesDir, "../../src/test/resources/server.keystore");
+ final SpringSslContext sslContext = new SpringSslContext();
+ sslContext.setKeyStore(keystore.getCanonicalPath());
+ sslContext.setKeyStorePassword("password");
+ sslContext.setTrustStore(keystore.getCanonicalPath());
+ sslContext.setTrustStorePassword("password");
+ sslContext.afterPropertiesSet();
+ brokerService.setSslContext(sslContext);
+
+ System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath());
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+
+ TransportConnector connector = null;
+
+ switch (connectorScheme) {
+ case "tcp":
+ connector = brokerService.addConnector("tcp://0.0.0.0:0" + getAdditionalConfig());
+ break;
+ case "nio":
+ connector = brokerService.addConnector("nio://0.0.0.0:0" + getAdditionalConfig());
+ break;
+ case "ssl":
+ connector = brokerService.addConnector("ssl://0.0.0.0:0" + getAdditionalConfig());
+ break;
+ case "nio+ssl":
+ connector = brokerService.addConnector("nio+ssl://0.0.0.0:0" + getAdditionalConfig());
+ break;
+ default:
+ throw new IOException("Invalid OpenWire connector scheme passed to test.");
+ }
+
+ brokerService.start();
+ brokerService.waitUntilStarted();
+
+ port = connector.getPublishableConnectURI().getPort();
+ }
+
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ brokerService = null;
+ }
+ }
+
+ public class DefaultTrustManager implements X509TrustManager {
+
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+}