You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/08/09 10:24:03 UTC

[2/2] activemq-artemis git commit: ARTEMIS-671 Returning messages after connection killed, and validating usage of reconnect

ARTEMIS-671 Returning messages after connection killed, and validating usage of reconnect


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/579d6226
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/579d6226
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/579d6226

Branch: refs/heads/master
Commit: 579d6226aa28b79ca15f5cf8ab3cc50415180656
Parents: 2539e6f
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Aug 8 19:29:27 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Aug 9 11:23:46 2016 +0100

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQExceptionType.java |   6 +
 .../core/ActiveMQRemoteDisconnectException.java |  33 +++
 .../artemis/logs/AssertionLoggerHandler.java    |  13 ++
 .../core/impl/RemotingConnectionImpl.java       |  16 +-
 .../impl/netty/ActiveMQChannelHandler.java      |   2 +
 .../protocol/AbstractRemotingConnection.java    |   5 +
 .../spi/core/protocol/RemotingConnection.java   |   7 +
 .../core/protocol/mqtt/MQTTConnection.java      |   5 +
 .../core/protocol/stomp/StompConnection.java    |   5 +
 .../server/impl/RemotingServiceImpl.java        |  28 +--
 .../clientcrash/PendingDeliveriesTest.java      | 206 +++++++++++++++++++
 .../artemis/tests/util/SpawnedVMSupport.java    |  11 +-
 12 files changed, 305 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index eb4bf5d..254d74c 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -172,6 +172,12 @@ public enum ActiveMQExceptionType {
          return new ActiveMQInvalidTransientQueueUseException(msg);
       }
    },
+   REMOTE_DISCONNECT(119) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQRemoteDisconnectException(msg);
+      }
+   },
 
    GENERIC_EXCEPTION(999),
    NATIVE_ERROR_INTERNAL(200),

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java
new file mode 100644
index 0000000..9d44b7d
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQRemoteDisconnectException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.REMOTE_DISCONNECT;
+
+/**
+ * A security problem occurred (authentication issues, permission issues,...)
+ */
+public final class ActiveMQRemoteDisconnectException extends ActiveMQException {
+
+   public ActiveMQRemoteDisconnectException() {
+      super(REMOTE_DISCONNECT);
+   }
+
+   public ActiveMQRemoteDisconnectException(String msg) {
+      super(REMOTE_DISCONNECT, msg);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java
index e0c1215..d7d9214 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AssertionLoggerHandler.java
@@ -71,6 +71,19 @@ public class AssertionLoggerHandler extends ExtHandler {
       return false;
    }
 
+   public static boolean findText(long mstimeout, String ... text) {
+
+      long timeMax = System.currentTimeMillis() + mstimeout;
+      do {
+         if (findText(text)) {
+            return true;
+         }
+      }
+      while (timeMax > System.currentTimeMillis());
+
+      return false;
+
+   }
    /**
     * Find a line that contains the parameters passed as an argument
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index f7dfa32..2a3522f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -25,6 +25,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
@@ -191,7 +192,9 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
          destroyed = true;
       }
 
-      ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+      if (!(me instanceof ActiveMQRemoteDisconnectException)) {
+         ActiveMQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
+      }
 
       try {
          transportConnection.forceClose();
@@ -329,6 +332,17 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
       return getTransportConnection().getDefaultActiveMQPrincipal();
    }
 
+   @Override
+   public boolean isSupportReconnect() {
+      for (Channel channel : channels.values()) {
+         if (channel.getConfirmationWindowSize() > 0) {
+            return true;
+         }
+      }
+
+      return false;
+   }
+
    // Buffer Handler implementation
    // ----------------------------------------------------
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index d963d1d..c581a5a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -77,6 +77,8 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
             active = false;
          }
       }
+
+      super.channelInactive(ctx);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index b7c0d17..c438766 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -201,6 +201,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
       return res;
    }
 
+   @Override
+   public boolean isSupportReconnect() {
+      return false;
+   }
+
    /*
     * This can be called concurrently by more than one thread so needs to be locked
     */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
index fe1a087..0f16db7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java
@@ -189,4 +189,11 @@ public interface RemotingConnection extends BufferHandler {
     *if slow consumer is killed,send the msessage to client.
     */
    void killMessage(SimpleString nodeID);
+
+   /**
+    * This will check if reconnects are supported on the protocol and configuration.
+    * In case it's not supported a connection failure could remove messages right away from pending deliveries.
+    * @return
+    */
+   boolean isSupportReconnect();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index f651d3d..aa87bd8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -216,4 +216,9 @@ public class MQTTConnection implements RemotingConnection {
    public void killMessage(SimpleString nodeID) {
       //unsupported
    }
+
+   @Override
+   public boolean isSupportReconnect() {
+      return false;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 475a34c..36f440c 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -96,6 +96,11 @@ public final class StompConnection implements RemotingConnection {
 
    private final int minLargeMessageSize;
 
+   @Override
+   public boolean isSupportReconnect() {
+      return false;
+   }
+
    public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
       StompFrame frame = null;
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 60ac9a0..4ff356e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -40,13 +40,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
+import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
 import org.apache.activemq.artemis.core.protocol.core.impl.CoreProtocolManagerFactory;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.remoting.server.RemotingService;
 import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.ServiceRegistry;
 import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
 import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
-import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -536,29 +535,10 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
 
       ConnectionEntry conn = connections.get(connectionID);
 
-      if (conn != null) {
-         // Bit of a hack - find a better way to do this
+      if (conn != null && !conn.connection.isSupportReconnect()) {
+         removeConnection(connectionID);
 
-         List<FailureListener> failureListeners = conn.connection.getFailureListeners();
-
-         boolean empty = true;
-
-         for (FailureListener listener : failureListeners) {
-            if (listener instanceof ServerSessionImpl) {
-               empty = false;
-
-               break;
-            }
-         }
-
-         // We only destroy the connection if the connection has no sessions attached to it
-         // Otherwise it means the connection has died without the sessions being closed first
-         // so we need to keep them for ttl, in case re-attachment occurs
-         if (empty) {
-            removeConnection(connectionID);
-
-            conn.connection.destroy();
-         }
+         conn.connection.fail(new ActiveMQRemoteDisconnectException());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
new file mode 100644
index 0000000..fa49780
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/clientcrash/PendingDeliveriesTest.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.clientcrash;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.SpawnedVMSupport;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PendingDeliveriesTest extends ClientTestBase {
+
+
+   @Before
+   public void createQueue() throws Exception {
+      server.createQueue(SimpleString.toSimpleString("jms.queue.queue1"), SimpleString.toSimpleString("jms.queue.queue1"), null, true, false);
+   }
+
+   @After
+   public void clearLogger() throws Exception {
+      System.out.println("After clearing");
+      AssertionLoggerHandler.stopCapture();
+      AssertionLoggerHandler.clear();
+   }
+
+   private static final String AMQP_URI = "amqp://localhost:61616?amqp.saslLayer=false";
+   private static final String CORE_URI_NO_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=-1";
+   private static final String CORE_URI_WITH_RECONNECT = "tcp://localhost:61616?confirmationWindowSize=" + (1024 * 1024);
+
+   public static void main(String[] arg) {
+      if (arg.length != 3) {
+         System.err.println("Usage:: URI destinationName cleanShutdown");
+         System.exit(-1);
+      }
+
+
+      String uri = arg[0];
+      String destinationName = arg[1];
+      boolean cleanShutdown = Boolean.valueOf(arg[2]);
+
+
+      ConnectionFactory factory;
+
+      factory = createCF(uri);
+
+      try {
+         Connection connection = factory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createQueue(destinationName);
+
+         System.err.println("***** " + destination);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(destination);
+         MessageProducer producer = session.createProducer(destination);
+
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("hello"));
+         }
+
+         System.err.println("CleanShutdown::" + cleanShutdown);
+
+         if (cleanShutdown) {
+            consumer.close();
+            connection.close();
+         }
+
+         System.exit(0);
+
+      }
+      catch (Throwable e) {
+         e.printStackTrace();
+         System.exit(-1);
+      }
+
+
+   }
+
+   private static ConnectionFactory createCF(String uri) {
+      ConnectionFactory factory;
+      if (uri.startsWith("amqp")) {
+         factory = new JmsConnectionFactory(uri);
+      }
+      else {
+         factory = new ActiveMQConnectionFactory(uri);
+      }
+      return factory;
+   }
+
+   @Test
+   public void testWithoutReconnect() throws Exception {
+
+      internalNoReconnect(AMQP_URI, "jms.queue.queue1");
+      internalNoReconnect(CORE_URI_NO_RECONNECT, "queue1");
+   }
+
+   private void internalNoReconnect(String uriToUse, String destinationName) throws Exception {
+      startClient(uriToUse, destinationName, true, false);
+
+      ConnectionFactory cf = createCF(uriToUse);
+      Connection connection = cf.createConnection();
+      connection.start();
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createQueue(destinationName);
+         MessageConsumer consumer = session.createConsumer(destination);
+
+         for (int i = 0; i < 100; i++) {
+            Assert.assertNotNull(consumer.receive(1000));
+         }
+      }
+      finally {
+         connection.stop();
+         connection.close();
+
+      }
+
+      if (cf instanceof ActiveMQConnectionFactory) {
+         ((ActiveMQConnectionFactory)cf).close();
+      }
+
+   }
+
+
+   @Test
+   public void testWithtReconnect() throws Exception {
+      startClient(CORE_URI_WITH_RECONNECT, "queue1", true, false);
+      ConnectionFactory cf = createCF(CORE_URI_WITH_RECONNECT);
+      Connection connection = cf.createConnection();
+      connection.start();
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Destination destination = session.createQueue("queue1");
+         MessageConsumer consumer = session.createConsumer(destination);
+
+         int i = 0;
+         for (; i < 100; i++) {
+            Message msg = consumer.receive(100);
+            if (msg == null) {
+               break;
+            }
+         }
+
+         Assert.assertTrue(i < 100);
+      }
+      finally {
+         connection.stop();
+         connection.close();
+
+      }
+   }
+
+
+   @Test
+   public void testCleanShutdownNoLogger() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      startClient(CORE_URI_NO_RECONNECT, "queue1", false, true);
+      Thread.sleep(500);
+      Assert.assertFalse(AssertionLoggerHandler.findText("clearing up resources"));
+   }
+
+   @Test
+   public void testBadShutdownLogger() throws Exception {
+      AssertionLoggerHandler.startCapture();
+      startClient(CORE_URI_NO_RECONNECT, "queue1", false, false);
+      Assert.assertTrue(AssertionLoggerHandler.findText(1000, "clearing up resources"));
+   }
+
+
+   @Test
+   public void testCleanShutdown() throws Exception {
+
+   }
+
+   private void startClient(String uriToUse, String destinationName, boolean log, boolean cleanShutdown) throws Exception {
+      Process process = SpawnedVMSupport.spawnVM(PendingDeliveriesTest.class.getName(), log, uriToUse, destinationName, Boolean.toString(cleanShutdown));
+      Assert.assertEquals(0, process.waitFor());
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/579d6226/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java
index dc81bc6..0ae36b5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/util/SpawnedVMSupport.java
@@ -119,10 +119,7 @@ public final class SpawnedVMSupport {
 
       Process process = builder.start();
 
-      if (logOutput) {
-         SpawnedVMSupport.startLogger(wordMatch, wordRunning, className, process);
-
-      }
+      SpawnedVMSupport.startLogger(logOutput, wordMatch, wordRunning, className, process);
 
       // Adding a reader to System.err, so the VM won't hang on a System.err.println as identified on this forum thread:
       // http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
@@ -138,8 +135,8 @@ public final class SpawnedVMSupport {
     * @param process
     * @throws ClassNotFoundException
     */
-   public static void startLogger(final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException {
-      ProcessLogger outputLogger = new ProcessLogger(true, process.getInputStream(), className, wordMatch, wordRunanble);
+   public static void startLogger(final boolean print, final String wordMatch, final Runnable wordRunanble, final String className, final Process process) throws ClassNotFoundException {
+      ProcessLogger outputLogger = new ProcessLogger(print, process.getInputStream(), className, wordMatch, wordRunanble);
       outputLogger.start();
    }
 
@@ -149,7 +146,7 @@ public final class SpawnedVMSupport {
     * @throws ClassNotFoundException
     */
    public static void startLogger(final String className, final Process process) throws ClassNotFoundException {
-      startLogger(null, null, className, process);
+      startLogger(true, null, null, className, process);
    }
 
    /**