You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/05/18 17:47:15 UTC

[1/4] activemq-artemis git commit: Individualizing traces and debug on client

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 664636dbd -> d728fe771


Individualizing traces and debug on client


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec526935
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec526935
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec526935

Branch: refs/heads/master
Commit: ec52693513b2f3a4a63f9cf4089bb8213ac8468c
Parents: 664636d
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 17 14:36:59 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 17 14:36:59 2016 -0400

----------------------------------------------------------------------
 .../core/ChannelBroadcastEndpointFactory.java   |  7 +-
 .../api/core/JGroupsBroadcastEndpoint.java      |  7 +-
 .../api/core/jgroups/JChannelManager.java       |  5 +-
 .../api/core/jgroups/JChannelWrapper.java       | 17 ++---
 .../api/core/jgroups/JGroupsReceiver.java       |  7 +-
 .../core/client/impl/ClientConsumerImpl.java    | 51 +++++++------
 .../client/impl/ClientSessionFactoryImpl.java   | 80 +++++++++-----------
 .../core/client/impl/ClientSessionImpl.java     | 49 ++++++------
 .../core/client/impl/ServerLocatorImpl.java     | 43 ++++++-----
 .../artemis/core/client/impl/Topology.java      | 62 +++++++--------
 .../artemis/core/cluster/DiscoveryGroup.java    | 13 ++--
 .../impl/ActiveMQClientProtocolManager.java     |  8 +-
 .../core/impl/ActiveMQSessionContext.java       |  7 +-
 .../core/protocol/core/impl/ChannelImpl.java    | 44 +++++------
 .../core/impl/RemotingConnectionImpl.java       | 15 +---
 .../remoting/impl/netty/NettyConnector.java     | 10 ++-
 .../protocol/AbstractRemotingConnection.java    |  5 +-
 .../artemis/utils/OrderedExecutorFactory.java   |  5 +-
 .../artemis/utils/SoftValueHashMap.java         |  8 +-
 .../apache/activemq/artemis/utils/XMLUtil.java  |  5 +-
 20 files changed, 228 insertions(+), 220 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
index d7086a5..af0df2e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
@@ -31,7 +31,6 @@ import org.jgroups.JChannel;
 public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory {
 
    private static final Logger logger = Logger.getLogger(ChannelBroadcastEndpointFactory.class);
-   private static final boolean isTrace = logger.isTraceEnabled();
 
    private final JChannel channel;
 
@@ -47,14 +46,14 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
 //   private static JChannelManager recoverManager(JChannel channel) {
 //      JChannelManager manager = managers.get(channel);
 //      if (manager == null) {
-//         if (isTrace) {
+//         if (logger.isTraceEnabled()) {
 //            logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
 //         }
 //         manager = new JChannelManager();
 //         managers.put(channel, manager);
 //      }
 //      else {
-//         if (isTrace) {
+//         if (logger.isTraceEnabled()) {
 //            logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
 //         }
 //
@@ -69,7 +68,7 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
    }
 
    private ChannelBroadcastEndpointFactory(JChannelManager manager, JChannel channel, String channelName) {
-      if (isTrace) {
+      if (logger.isTraceEnabled()) {
          logger.trace("new ChannelBroadcastEndpointFactory(" + manager + ", " + channel + ", " + channelName, new Exception("trace"));
       }
       this.manager = manager;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
index 7657b0b..79c7e14 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsBroadcastEndpoint.java
@@ -30,7 +30,6 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
    private static final Logger logger = Logger.getLogger(JGroupsBroadcastEndpoint.class);
 
-   private static final boolean isTrace = logger.isTraceEnabled();
    private final String channelName;
 
    private boolean clientOpened;
@@ -50,7 +49,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
    @Override
    public void broadcast(final byte[] data) throws Exception {
-      if (isTrace) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
+      if (logger.isTraceEnabled()) logger.trace("Broadcasting: BroadCastOpened=" + broadcastOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (broadcastOpened) {
          org.jgroups.Message msg = new org.jgroups.Message();
 
@@ -62,7 +61,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
    @Override
    public byte[] receiveBroadcast() throws Exception {
-      if (isTrace) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
+      if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast();
       }
@@ -73,7 +72,7 @@ public abstract class JGroupsBroadcastEndpoint implements BroadcastEndpoint {
 
    @Override
    public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
-      if (isTrace) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
+      if (logger.isTraceEnabled()) logger.trace("Receiving Broadcast2: clientOpened=" + clientOpened + ", channelOPen=" + channel.getChannel().isOpen());
       if (clientOpened) {
          return receiver.receiveBroadcast(time, unit);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
index 296dc8a..f594c07 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -33,7 +33,6 @@ import org.jboss.logging.Logger;
 public class JChannelManager {
 
    private static final Logger logger = Logger.getLogger(JChannelManager.class);
-   private static final boolean isTrace = logger.isTraceEnabled();
 
    private Map<String, JChannelWrapper> channels;
 
@@ -46,11 +45,11 @@ public class JChannelManager {
       if (wrapper == null) {
          wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
          channels.put(channelName, wrapper);
-         if (isTrace)
+         if (logger.isTraceEnabled())
             logger.trace("Put Channel " + channelName);
          return wrapper;
       }
-      if (isTrace)
+      if (logger.isTraceEnabled())
          logger.trace("Add Ref Count " + channelName);
       return wrapper.addRef();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
index 7851c9a..7d64dd4 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -31,7 +31,6 @@ import org.jgroups.ReceiverAdapter;
  */
 public class JChannelWrapper {
    private static final Logger logger = Logger.getLogger(JChannelWrapper.class);
-   private static final boolean isTrace = logger.isTraceEnabled();
 
    private boolean connected = false;
    int refCount = 1;
@@ -47,7 +46,7 @@ public class JChannelWrapper {
       this.manager = manager;
 
 
-      if (isTrace && channel.getReceiver() != null) {
+      if (logger.isTraceEnabled() && channel.getReceiver() != null) {
          logger.trace(this + "The channel already had a receiver previously!!!! == " + channel.getReceiver(), new Exception("trace"));
       }
 
@@ -61,7 +60,7 @@ public class JChannelWrapper {
 
          @Override
          public void receive(org.jgroups.Message msg) {
-            if (isTrace) {
+            if (logger.isTraceEnabled()) {
                logger.trace(this + ":: Wrapper received " + msg + " on channel " + channelName);
             }
             synchronized (receivers) {
@@ -83,7 +82,7 @@ public class JChannelWrapper {
 
    public synchronized void close(boolean closeWrappedChannel) {
       refCount--;
-      if (isTrace) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
+      if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
       if (refCount == 0) {
          if (closeWrappedChannel) {
             connected = false;
@@ -96,14 +95,14 @@ public class JChannelWrapper {
    }
 
    public void removeReceiver(JGroupsReceiver receiver) {
-      if (isTrace) logger.trace(this + "::removeReceiver: " + receiver + " on "  + channelName, new Exception("Trace"));
+      if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on "  + channelName, new Exception("Trace"));
       synchronized (receivers) {
          receivers.remove(receiver);
       }
    }
 
    public synchronized void connect() throws Exception {
-      if (isTrace) {
+      if (logger.isTraceEnabled()) {
          logger.trace(this + ":: Connecting " + channelName, new Exception("Trace"));
       }
 
@@ -121,19 +120,19 @@ public class JChannelWrapper {
 
    public void addReceiver(JGroupsReceiver jGroupsReceiver) {
       synchronized (receivers) {
-         if (isTrace) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
+         if (logger.isTraceEnabled()) logger.trace(this + "::Add Receiver: " + jGroupsReceiver + " on " + channelName);
          receivers.add(jGroupsReceiver);
       }
    }
 
    public void send(org.jgroups.Message msg) throws Exception {
-      if (isTrace) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
+      if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
       channel.send(msg);
    }
 
    public JChannelWrapper addRef() {
       this.refCount++;
-      if (isTrace) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
+      if (logger.isTraceEnabled()) logger.trace(this + "::RefCount++ = " + refCount + " on channel " + channelName);
       return this;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
index c931661..c49caf0 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JGroupsReceiver.java
@@ -31,19 +31,18 @@ import org.jgroups.ReceiverAdapter;
 public class JGroupsReceiver extends ReceiverAdapter {
 
    private static final Logger logger = Logger.getLogger(JGroupsReceiver.class);
-   private static final boolean isTrace = logger.isTraceEnabled();
 
    private final BlockingQueue<byte[]> dequeue = new LinkedBlockingDeque<>();
 
    @Override
    public void receive(org.jgroups.Message msg) {
-      if (isTrace) logger.trace("sending message " + msg);
+      if (logger.isTraceEnabled()) logger.trace("sending message " + msg);
       dequeue.add(msg.getBuffer());
    }
 
    public byte[] receiveBroadcast() throws Exception {
       byte[] bytes = dequeue.take();
-      if (isTrace) {
+      if (logger.isTraceEnabled()) {
          logBytes("receiveBroadcast()", bytes);
       }
 
@@ -62,7 +61,7 @@ public class JGroupsReceiver extends ReceiverAdapter {
    public byte[] receiveBroadcast(long time, TimeUnit unit) throws Exception {
       byte[] bytes = dequeue.poll(time, unit);
 
-      if (isTrace) {
+      if (logger.isTraceEnabled()) {
          logBytes("receiveBroadcast(long time, TimeUnit unit)", bytes);
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 8013b72..7b72188 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -43,12 +43,13 @@ import org.apache.activemq.artemis.utils.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
 import org.apache.activemq.artemis.utils.ReusableLatch;
 import org.apache.activemq.artemis.utils.TokenBucketLimiter;
+import org.jboss.logging.Logger;
 
 public final class ClientConsumerImpl implements ClientConsumerInternal {
    // Constants
    // ------------------------------------------------------------------------------------
 
-   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
+   private static final Logger logger = Logger.getLogger(ClientConsumerImpl.class);
 
    private static final long CLOSE_TIMEOUT_MILLISECONDS = 10000;
 
@@ -267,8 +268,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
             }
 
             if (callForceDelivery) {
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Forcing delivery");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Forcing delivery");
                }
                // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
                sessionContext.forceDelivery(this, forceDeliveryCount++);
@@ -289,15 +290,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                      // forced delivery messages are discarded, nothing has been delivered by the queue
                      resetIfSlowConsumer();
 
-                     if (isTrace) {
-                        ActiveMQClientLogger.LOGGER.trace("There was nothing on the queue, leaving it now:: returning null");
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("There was nothing on the queue, leaving it now:: returning null");
                      }
 
                      return null;
                   }
                   else {
-                     if (isTrace) {
-                        ActiveMQClientLogger.LOGGER.trace("Ignored force delivery answer as it belonged to another call");
+                     if (logger.isTraceEnabled()) {
+                        logger.trace("Ignored force delivery answer as it belonged to another call");
                      }
                      // Ignore the message
                      continue;
@@ -329,15 +330,15 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                   largeMessageReceived = m;
                }
 
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Returning " + m);
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Returning " + m);
                }
 
                return m;
             }
             else {
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Returning null");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Returning null");
                }
                resetIfSlowConsumer();
                return null;
@@ -645,8 +646,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
          return;
       }
       if (currentLargeMessageController == null) {
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Sending back credits for largeController = null " + flowControlSize);
+         if (logger.isTraceEnabled()) {
+            logger.trace("Sending back credits for largeController = null " + flowControlSize);
          }
          flowControl(flowControlSize, false);
       }
@@ -761,8 +762,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
          if (creditsToSend >= clientWindowSize) {
             if (clientWindowSize == 0 && discountSlowConsumer) {
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
                }
 
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
@@ -776,8 +777,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                }
             }
             else {
-               if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-                  ActiveMQClientLogger.LOGGER.debug("Sending " + messageBytes + " from flow-control");
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Sending " + messageBytes + " from flow-control");
                }
 
                final int credits = creditsToSend;
@@ -808,8 +809,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
     * Sending an initial credit for slow consumers
     */
    private void startSlowConsumer() {
-      if (isTrace) {
-         ActiveMQClientLogger.LOGGER.trace("Sending 1 credit to start delivering of one message to slow consumer");
+      if (logger.isTraceEnabled()) {
+         logger.trace("Sending 1 credit to start delivering of one message to slow consumer");
       }
       sendCredits(1);
       try {
@@ -853,8 +854,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
    }
 
    private void queueExecutor() {
-      if (isTrace) {
-         ActiveMQClientLogger.LOGGER.trace("Adding Runner on Executor for delivery");
+      if (logger.isTraceEnabled()) {
+         logger.trace("Adding Runner on Executor for delivery");
       }
 
       sessionExecutor.execute(runner);
@@ -944,8 +945,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
             flowControlBeforeConsumption(message);
 
             if (!expired) {
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Calling handler.onMessage");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Calling handler.onMessage");
                }
                final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                   @Override
@@ -979,8 +980,8 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                   onMessageThread = null;
                }
 
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Handler.onMessage done");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Handler.onMessage done");
                }
 
                if (message.isLargeMessage()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 9d9d593..96b4059 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -66,17 +66,11 @@ import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.OrderedExecutorFactory;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
 
 public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {
-   // Constants
-   // ------------------------------------------------------------------------------------
 
-   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
-
-   private static final boolean isDebug = ActiveMQClientLogger.LOGGER.isDebugEnabled();
-
-   // Attributes
-   // -----------------------------------------------------------------------------------
+   private static final Logger logger = Logger.getLogger(ClientSessionFactoryImpl.class);
 
    private final ServerLocatorInternal serverLocator;
 
@@ -270,14 +264,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
 
       if (localConnector.isEquivalent(live.getParams()) && backUp != null && !localConnector.isEquivalent(backUp.getParams())) {
-         if (ClientSessionFactoryImpl.isDebug) {
-            ActiveMQClientLogger.LOGGER.debug("Setting up backup config = " + backUp + " for live = " + live);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Setting up backup config = " + backUp + " for live = " + live);
          }
          backupConfig = backUp;
       }
       else {
-         if (ClientSessionFactoryImpl.isDebug) {
-            ActiveMQClientLogger.LOGGER.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
+         if (logger.isDebugEnabled()) {
+            logger.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live +
                                                  " / " +
                                                  backUp +
                                                  " but it didn't belong to " +
@@ -514,7 +508,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
       catch (ActiveMQInterruptedException e1) {
          // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
-         ActiveMQClientLogger.LOGGER.debug(e1.getMessage(), e1);
+         logger.debug(e1.getMessage(), e1);
       }
       catch (Throwable t) {
          //for anything else just close so clients are un blocked
@@ -548,8 +542,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
             return;
          }
 
-         if (ClientSessionFactoryImpl.isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
+         if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
+            logger.trace("Client Connection failed, calling failure listeners and trying to reconnect, reconnectAttempts=" + reconnectAttempts);
          }
 
          callFailoverListeners(FailoverEventType.FAILURE_DETECTED);
@@ -782,8 +776,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
    private void getConnectionWithRetry(final int reconnectAttempts) {
       if (!clientProtocolManager.isAlive())
          return;
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("getConnectionWithRetry::" + reconnectAttempts +
+      if (logger.isTraceEnabled()) {
+         logger.trace("getConnectionWithRetry::" + reconnectAttempts +
                                               " with retryInterval = " +
                                               retryInterval +
                                               " multiplier = " +
@@ -795,13 +789,13 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       int count = 0;
 
       while (clientProtocolManager.isAlive()) {
-         if (ClientSessionFactoryImpl.isDebug) {
-            ActiveMQClientLogger.LOGGER.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
+         if (logger.isDebugEnabled()) {
+            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);
          }
 
          if (getConnection() != null) {
-            if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQClientLogger.LOGGER.debug("Reconnection successful");
+            if (logger.isDebugEnabled()) {
+               logger.debug("Reconnection successful");
             }
             return;
          }
@@ -819,7 +813,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                   return;
                }
 
-               if (ClientSessionFactoryImpl.isTrace) {
+               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
                   ActiveMQClientLogger.LOGGER.waitingForRetry(interval, retryInterval, retryIntervalMultiplier);
                }
 
@@ -842,7 +836,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                interval = newInterval;
             }
             else {
-               ActiveMQClientLogger.LOGGER.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
+               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");
                return;
             }
          }
@@ -929,14 +923,14 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
             if (serverLocator.getTopology() != null) {
                if (connection != null) {
-                  if (ClientSessionFactoryImpl.isTrace) {
-                     ActiveMQClientLogger.LOGGER.trace(this + "::Subscribing Topology");
+                  if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
+                     logger.trace(this + "::Subscribing Topology");
                   }
                   clientProtocolManager.sendSubscribeTopology(serverLocator.isClusterConnection());
                }
             }
             else {
-               ActiveMQClientLogger.LOGGER.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
+               logger.debug("serverLocator@" + System.identityHashCode(serverLocator + " had no topology"));
             }
 
             return connection;
@@ -1050,8 +1044,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       Connection transportConnection = connector.createConnection();
 
       if (transportConnection == null) {
-         if (ClientSessionFactoryImpl.isDebug) {
-            ActiveMQClientLogger.LOGGER.debug("Connector towards " + connector + " failed");
+         if (logger.isDebugEnabled()) {
+            logger.debug("Connector towards " + connector + " failed");
          }
 
          try {
@@ -1081,8 +1075,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       Connection transportConnection = null;
 
       try {
-         if (ClientSessionFactoryImpl.isDebug) {
-            ActiveMQClientLogger.LOGGER.debug("Trying to connect with connector = " + connectorFactory +
+         if (logger.isDebugEnabled()) {
+            logger.debug("Trying to connect with connector = " + connectorFactory +
                                                  ", parameters = " +
                                                  connectorConfig.getParams() +
                                                  " connector = " +
@@ -1096,8 +1090,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
             connector = liveConnector;
          }
          else if (backupConfig != null) {
-            if (ClientSessionFactoryImpl.isDebug) {
-               ActiveMQClientLogger.LOGGER.debug("Trying backup config = " + backupConfig);
+            if (logger.isDebugEnabled()) {
+               logger.debug("Trying backup config = " + backupConfig);
             }
 
             ConnectorFactory backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
@@ -1109,8 +1103,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
             if (transportConnection != null) {
             /*looks like the backup is now live, let's use that*/
 
-               if (ClientSessionFactoryImpl.isDebug) {
-                  ActiveMQClientLogger.LOGGER.debug("Connected to the backup at " + backupConfig);
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Connected to the backup at " + backupConfig);
                }
 
                // Switching backup as live
@@ -1120,8 +1114,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                connectorFactory = backupConnectorFactory;
             }
             else {
-               if (ClientSessionFactoryImpl.isDebug) {
-                  ActiveMQClientLogger.LOGGER.debug("Backup is not active.");
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Backup is not active.");
                }
             }
 
@@ -1166,7 +1160,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
             theConn.bufferReceived(connectionID, buffer);
          }
          else {
-            ActiveMQClientLogger.LOGGER.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
+            logger.debug("TheConn == null on ClientSessionFactoryImpl::DelegatingBufferHandler, ignoring packet");
          }
       }
    }
@@ -1279,8 +1273,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       Connection transportConnection = createTransportConnection();
 
       if (transportConnection == null) {
-         if (ClientSessionFactoryImpl.isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Neither backup or live were active, will just give up now");
+         if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {
+            logger.trace("Neither backup or live were active, will just give up now");
          }
          return null;
       }
@@ -1291,8 +1285,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
 
       schedulePing();
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("returning " + newConnection);
+      if (logger.isTraceEnabled()) {
+         logger.trace("returning " + newConnection);
       }
 
       return newConnection;
@@ -1320,8 +1314,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       @Override
       public void nodeDisconnected(RemotingConnection conn, String nodeID, String scaleDownTargetNodeID) {
 
-         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-            ActiveMQClientLogger.LOGGER.trace("Disconnect being called on client:" +
+         if (logger.isTraceEnabled()) {
+            logger.trace("Disconnect being called on client:" +
                                                  " server locator = " +
                                                  serverLocator +
                                                  " notifying node " +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 145d963..ae8966c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -51,9 +51,12 @@ import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.XidCodecSupport;
+import org.jboss.logging.Logger;
 
 public final class ClientSessionImpl implements ClientSessionInternal, FailureListener {
 
+   private static final Logger logger = Logger.getLogger(ClientSessionImpl.class);
+
    private final Map<String, String> metadata = new HashMap<>();
 
    private final ClientSessionFactoryInternal sessionFactory;
@@ -486,8 +489,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    public void commit() throws ActiveMQException {
       checkClosed();
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Sending commit");
+      if (logger.isTraceEnabled()) {
+         logger.trace("Sending commit");
       }
 
       /*
@@ -547,8 +550,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    public void rollback(final boolean isLastMessageAsDelivered, final boolean waitConsumers) throws ActiveMQException
    {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
+      if (logger.isTraceEnabled()) {
+         logger.trace("calling rollback(isLastMessageAsDelivered=" + isLastMessageAsDelivered + ")");
       }
       checkClosed();
 
@@ -745,8 +748,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
       }
 
       checkClosed();
-      if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQClientLogger.LOGGER.debug("client ack messageID = " + message.getMessageID());
+      if (logger.isDebugEnabled()) {
+         logger.debug("client ack messageID = " + message.getMessageID());
       }
 
       startCall();
@@ -870,12 +873,12 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    @Override
    public void close() throws ActiveMQException {
       if (closed) {
-         ActiveMQClientLogger.LOGGER.debug("Session was already closed, giving up now, this=" + this);
+         logger.debug("Session was already closed, giving up now, this=" + this);
          return;
       }
 
-      if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQClientLogger.LOGGER.debug("Calling close on session " + this);
+      if (logger.isDebugEnabled()) {
+         logger.debug("Calling close on session " + this);
       }
 
       try {
@@ -891,7 +894,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
          // Session close should always return without exception
 
          // Note - we only log at trace
-         ActiveMQClientLogger.LOGGER.trace("Failed to close session", e);
+         logger.trace("Failed to close session", e);
       }
 
       doCleanup(false);
@@ -1128,8 +1131,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    @Override
    public void commit(final Xid xid, final boolean onePhase) throws XAException {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("call commit(xid=" + convert(xid));
+      if (logger.isTraceEnabled()) {
+         logger.trace("call commit(xid=" + convert(xid));
       }
       checkXA();
 
@@ -1178,8 +1181,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    @Override
    public void end(final Xid xid, final int flags) throws XAException {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags));
+      if (logger.isTraceEnabled()) {
+         logger.trace("Calling end:: " + convert(xid) + ", flags=" + convertTXFlag(flags));
       }
 
       checkXA();
@@ -1190,7 +1193,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
                rollback(false, false);
             }
             catch (Throwable ignored) {
-               ActiveMQClientLogger.LOGGER.debug("Error on rollback during end call!", ignored);
+               logger.debug("Error on rollback during end call!", ignored);
             }
             throw new XAException(XAException.XAER_RMFAIL);
          }
@@ -1315,8 +1318,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    @Override
    public int prepare(final Xid xid) throws XAException {
       checkXA();
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Calling prepare:: " + convert(xid));
+      if (logger.isTraceEnabled()) {
+         logger.trace("Calling prepare:: " + convert(xid));
       }
 
       if (rollbackOnly) {
@@ -1402,8 +1405,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    public void rollback(final Xid xid) throws XAException {
       checkXA();
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Calling rollback:: " + convert(xid));
+      if (logger.isTraceEnabled()) {
+         logger.trace("Calling rollback:: " + convert(xid));
       }
 
       try {
@@ -1455,8 +1458,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
 
    @Override
    public void start(final Xid xid, final int flags) throws XAException {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags));
+      if (logger.isTraceEnabled()) {
+         logger.trace("Calling start:: " + convert(xid) + " clientXID=" + xid + " flags = " + convertTXFlag(flags));
       }
 
       checkXA();
@@ -1633,8 +1636,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
    }
 
    private void doCleanup(boolean failingOver) {
-      if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQClientLogger.LOGGER.debug("calling cleanup on " + this);
+      if (logger.isDebugEnabled()) {
+         logger.debug("calling cleanup on " + this);
       }
 
       synchronized (this) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index d9f641a..8c71002 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -68,6 +68,7 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadPoolExecutor;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithIgnores;
+import org.jboss.logging.Logger;
 
 /**
  * This is the implementation of {@link org.apache.activemq.artemis.api.core.client.ServerLocator} and all
@@ -75,6 +76,8 @@ import org.apache.activemq.artemis.utils.uri.FluentPropertyBeanIntrospectorWithI
  */
 public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {
 
+   private static final Logger logger = Logger.getLogger(ServerLocatorImpl.class);
+
    private enum STATE {
       INITIALIZED, CLOSED, CLOSING
    }
@@ -536,8 +539,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       synchronized (this) {
          // if the topologyArray is null, we will use the initialConnectors
          if (usedTopology != null) {
-            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQClientLogger.LOGGER.trace("Selecting connector from toplogy.");
+            if (logger.isTraceEnabled()) {
+               logger.trace("Selecting connector from toplogy.");
             }
             int pos = loadBalancingPolicy.select(usedTopology.length);
             Pair<TransportConfiguration, TransportConfiguration> pair = usedTopology[pos];
@@ -546,8 +549,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          }
          else {
             // Get from initialconnectors
-            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQClientLogger.LOGGER.trace("Selecting connector from initial connectors.");
+            if (logger.isTraceEnabled()) {
+               logger.trace("Selecting connector from initial connectors.");
             }
 
             int pos = loadBalancingPolicy.select(initialConnectors.length);
@@ -651,8 +654,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    public ClientSessionFactory createSessionFactory(String nodeID) throws Exception {
       TopologyMember topologyMember = topology.getMember(nodeID);
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe());
+      if (logger.isTraceEnabled()) {
+         logger.trace("Creating connection factory towards " + nodeID + " = " + topologyMember + ", topology=" + topology.describe());
       }
 
       if (topologyMember == null) {
@@ -1323,8 +1326,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
    private void doClose(final boolean sendClose) {
       synchronized (stateGuard) {
          if (state == STATE.CLOSED) {
-            if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQClientLogger.LOGGER.debug(this + " is already closed when calling closed");
+            if (logger.isDebugEnabled()) {
+               logger.debug(this + " is already closed when calling closed");
             }
             return;
          }
@@ -1428,8 +1431,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          return;
       }
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
+      if (logger.isTraceEnabled()) {
+         logger.trace("nodeDown " + this + " nodeID=" + nodeID + " as being down", new Exception("trace"));
       }
 
       topology.removeMember(eventTime, nodeID);
@@ -1462,8 +1465,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                             final String scaleDownGroupName,
                             final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             final boolean last) {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
+      if (logger.isTraceEnabled()) {
+         logger.trace("NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception("trace"));
       }
 
       TopologyMemberImpl member = new TopologyMemberImpl(nodeID, backupGroupName, scaleDownGroupName, connectorPair.getA(), connectorPair.getB());
@@ -1654,8 +1657,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
             while (csf == null && !isClosed()) {
                retryNumber++;
                for (Connector conn : connectors) {
-                  if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-                     ActiveMQClientLogger.LOGGER.debug(this + "::Submitting connect towards " + conn);
+                  if (logger.isDebugEnabled()) {
+                     logger.debug(this + "::Submitting connect towards " + conn);
                   }
 
                   csf = conn.tryConnect();
@@ -1690,8 +1693,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                         }
                      });
 
-                     if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-                        ActiveMQClientLogger.LOGGER.debug("Returning " + csf +
+                     if (logger.isDebugEnabled()) {
+                        logger.debug("Returning " + csf +
                                                              " after " +
                                                              retryNumber +
                                                              " retries on StaticConnector " +
@@ -1714,7 +1717,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          catch (RejectedExecutionException e) {
             if (isClosed() || skipWarnings)
                return null;
-            ActiveMQClientLogger.LOGGER.debug("Rejected execution", e);
+            logger.debug("Rejected execution", e);
             throw e;
          }
          catch (Exception e) {
@@ -1787,8 +1790,8 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
          }
 
          public ClientSessionFactory tryConnect() throws ActiveMQException {
-            if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQClientLogger.LOGGER.debug(this + "::Trying to connect to " + factory);
+            if (logger.isDebugEnabled()) {
+               logger.debug(this + "::Trying to connect to " + factory);
             }
             try {
                ClientSessionFactoryInternal factoryToUse = factory;
@@ -1805,7 +1808,7 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                return factoryToUse;
             }
             catch (ActiveMQException e) {
-               ActiveMQClientLogger.LOGGER.debug(this + "::Exception on establish connector initial connection", e);
+               logger.debug(this + "::Exception on establish connector initial connection", e);
                return null;
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
index 0573b2d..4e9230b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/Topology.java
@@ -30,8 +30,10 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.spi.core.remoting.Connector;
+import org.jboss.logging.Logger;
 
 public final class Topology {
+   private static final Logger logger = Logger.getLogger(Topology.class);
 
    private final Set<ClusterTopologyListener> topologyListeners;
 
@@ -76,8 +78,8 @@ public final class Topology {
       }
       this.executor = executor;
       this.owner = owner;
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
+      if (logger.isTraceEnabled()) {
+         logger.trace("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE", new Exception("trace"));
       }
    }
 
@@ -89,8 +91,8 @@ public final class Topology {
    }
 
    public void addClusterTopologyListener(final ClusterTopologyListener listener) {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace(this + "::Adding topology listener " + listener, new Exception("Trace"));
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::Adding topology listener " + listener, new Exception("Trace"));
       }
       synchronized (topologyListeners) {
          topologyListeners.add(listener);
@@ -99,8 +101,8 @@ public final class Topology {
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener) {
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace(this + "::Removing topology listener " + listener, new Exception("Trace"));
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::Removing topology listener " + listener, new Exception("Trace"));
       }
       synchronized (topologyListeners) {
          topologyListeners.remove(listener);
@@ -112,8 +114,8 @@ public final class Topology {
     */
    public void updateAsLive(final String nodeId, final TopologyMemberImpl memberInput) {
       synchronized (this) {
-         if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-            ActiveMQClientLogger.LOGGER.debug(this + "::node " + nodeId + "=" + memberInput);
+         if (logger.isDebugEnabled()) {
+            logger.debug(this + "::node " + nodeId + "=" + memberInput);
          }
          memberInput.setUniqueEventID(System.currentTimeMillis());
          topology.remove(nodeId);
@@ -142,15 +144,15 @@ public final class Topology {
     */
    public TopologyMemberImpl updateBackup(final TopologyMemberImpl memberInput) {
       final String nodeId = memberInput.getNodeId();
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::updateBackup::" + nodeId + ", memberInput=" + memberInput);
       }
 
       synchronized (this) {
          TopologyMemberImpl currentMember = getMember(nodeId);
          if (currentMember == null) {
-            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQClientLogger.LOGGER.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace"));
+            if (logger.isTraceEnabled()) {
+               logger.trace("There's no live to be updated on backup update, node=" + nodeId + " memberInput=" + memberInput, new Exception("trace"));
             }
 
             currentMember = memberInput;
@@ -178,7 +180,7 @@ public final class Topology {
 
       Long deleteTme = getMapDelete().get(nodeId);
       if (deleteTme != null && uniqueEventID != 0 && uniqueEventID < deleteTme) {
-         ActiveMQClientLogger.LOGGER.debug("Update uniqueEvent=" + uniqueEventID +
+         logger.debug("Update uniqueEvent=" + uniqueEventID +
                                               ", nodeId=" +
                                               nodeId +
                                               ", memberInput=" +
@@ -191,8 +193,8 @@ public final class Topology {
          TopologyMemberImpl currentMember = topology.get(nodeId);
 
          if (currentMember == null) {
-            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQClientLogger.LOGGER.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace"));
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::NewMemberAdd nodeId=" + nodeId + " member = " + memberInput, new Exception("trace"));
             }
             memberInput.setUniqueEventID(uniqueEventID);
             topology.put(nodeId, memberInput);
@@ -210,8 +212,8 @@ public final class Topology {
                newMember.setBackup(currentMember.getBackup());
             }
 
-            if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-               ActiveMQClientLogger.LOGGER.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" +
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + "::updated currentMember=nodeID=" + nodeId + ", currentMember=" +
                                                     currentMember + ", memberInput=" + memberInput + "newMember=" +
                                                     newMember, new Exception("trace"));
             }
@@ -241,8 +243,8 @@ public final class Topology {
    private void sendMemberUp(final String nodeId, final TopologyMemberImpl memberToSend) {
       final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::prepare to send " + nodeId + " to " + copy.size() + " elements");
       }
 
       if (copy.size() > 0) {
@@ -250,8 +252,8 @@ public final class Topology {
             @Override
             public void run() {
                for (ClusterTopologyListener listener : copy) {
-                  if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQClientLogger.LOGGER.trace(Topology.this + " informing " +
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(Topology.this + " informing " +
                                                           listener +
                                                           " about node up = " +
                                                           nodeId +
@@ -289,7 +291,7 @@ public final class Topology {
          member = topology.get(nodeId);
          if (member != null) {
             if (member.getUniqueEventID() > uniqueEventID) {
-               ActiveMQClientLogger.LOGGER.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
+               logger.debug("The removeMember was issued before the node " + nodeId + " was started, ignoring call");
                member = null;
             }
             else {
@@ -299,8 +301,8 @@ public final class Topology {
          }
       }
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("removeMember " + this +
+      if (logger.isTraceEnabled()) {
+         logger.trace("removeMember " + this +
                                               " removing nodeID=" +
                                               nodeId +
                                               ", result=" +
@@ -316,8 +318,8 @@ public final class Topology {
             @Override
             public void run() {
                for (ClusterTopologyListener listener : copy) {
-                  if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-                     ActiveMQClientLogger.LOGGER.trace(this + " informing " + listener + " about node down = " + nodeId);
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + " informing " + listener + " about node down = " + nodeId);
                   }
                   try {
                      listener.nodeDown(uniqueEventID, nodeId);
@@ -333,8 +335,8 @@ public final class Topology {
    }
 
    public synchronized void sendTopology(final ClusterTopologyListener listener) {
-      if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQClientLogger.LOGGER.debug(this + " is sending topology to " + listener);
+      if (logger.isDebugEnabled()) {
+         logger.debug(this + " is sending topology to " + listener);
       }
 
       executor.execute(new Runnable() {
@@ -349,8 +351,8 @@ public final class Topology {
             }
 
             for (Map.Entry<String, TopologyMemberImpl> entry : copy.entrySet()) {
-               if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-                  ActiveMQClientLogger.LOGGER.debug(Topology.this + " sending " +
+               if (logger.isDebugEnabled()) {
+                  logger.debug(Topology.this + " sending " +
                                                        entry.getKey() +
                                                        " / " +
                                                        entry.getValue().getConnector() +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
index 40b48a6..4c3fdb9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/cluster/DiscoveryGroup.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationService;
 import org.apache.activemq.artemis.utils.TypedProperties;
+import org.jboss.logging.Logger;
 
 /**
  * This class is used to search for members on the cluster through the opaque interface {@link BroadcastEndpoint}.
@@ -47,7 +48,7 @@ import org.apache.activemq.artemis.utils.TypedProperties;
  */
 public final class DiscoveryGroup implements ActiveMQComponent {
 
-   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
+   private static final Logger logger = Logger.getLogger(DiscoveryGroup.class);
 
    private final List<DiscoveryListener> listeners = new ArrayList<>();
 
@@ -317,10 +318,10 @@ public final class DiscoveryGroup implements ActiveMQComponent {
                //only call the listeners if we have changed
                //also make sure that we aren't stopping to avoid deadlock
                if (changed && started) {
-                  if (isTrace) {
-                     ActiveMQClientLogger.LOGGER.trace("Connectors changed on Discovery:");
+                  if (logger.isTraceEnabled()) {
+                     logger.trace("Connectors changed on Discovery:");
                      for (DiscoveryEntry connector : connectors.values()) {
-                        ActiveMQClientLogger.LOGGER.trace(connector);
+                        logger.trace(connector);
                      }
                   }
                   callListeners();
@@ -376,8 +377,8 @@ public final class DiscoveryGroup implements ActiveMQComponent {
          Map.Entry<String, DiscoveryEntry> entry = iter.next();
 
          if (entry.getValue().getLastUpdate() + timeout <= now) {
-            if (isTrace) {
-               ActiveMQClientLogger.LOGGER.trace("Timed out node on discovery:" + entry.getValue());
+            if (logger.isTraceEnabled()) {
+               logger.trace("Timed out node on discovery:" + entry.getValue());
             }
             iter.remove();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
index 0dbda12..30a0b7a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.protocol.ClientPacketDecoder;
@@ -59,6 +58,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.TopologyResponseHandler;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.VersionLoader;
+import org.jboss.logging.Logger;
 
 /**
  * This class will return specific packets for different types of actions happening on a messaging protocol.
@@ -74,6 +74,8 @@ import org.apache.activemq.artemis.utils.VersionLoader;
 
 public class ActiveMQClientProtocolManager implements ClientProtocolManager {
 
+   private static final Logger logger = Logger.getLogger(ActiveMQClientProtocolManager.class);
+
    private static final String handshake = "ARTEMIS";
 
    private final int versionID = VersionLoader.getVersion().getIncrementingVersion();
@@ -504,8 +506,8 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
          }
 
          if (topMessage.isExit()) {
-            if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQClientLogger.LOGGER.debug("Notifying " + topMessage.getNodeID() + " going down");
+            if (logger.isDebugEnabled()) {
+               logger.debug("Notifying " + topMessage.getNodeID() + " going down");
             }
 
             if (topologyResponseHandler != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 7ec18f5..794df52 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -101,6 +101,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
 import org.apache.activemq.artemis.utils.VersionLoader;
+import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
 import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
@@ -110,6 +111,8 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES
 
 public class ActiveMQSessionContext extends SessionContext {
 
+   private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
+
    private final Channel sessionChannel;
    private final int serverVersion;
    private int confirmationWindow;
@@ -340,8 +343,8 @@ public class ActiveMQSessionContext extends SessionContext {
          throw new XAException(response.getResponseCode());
       }
 
-      if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
-         ActiveMQClientLogger.LOGGER.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
+      if (logger.isTraceEnabled()) {
+         logger.trace("finished commit on " + ClientSessionImpl.convert(xid) + " with response = " + response);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 5dc4bc5..957a3a9 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -39,8 +39,10 @@ import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ActiveMQExceptionMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.PacketsConfirmedMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.jboss.logging.Logger;
 
 public final class ChannelImpl implements Channel {
+   private static final Logger logger = Logger.getLogger(ChannelImpl.class);
 
    public enum CHANNEL_ID {
       /**
@@ -79,8 +81,6 @@ public final class ChannelImpl implements Channel {
       }
    }
 
-   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
-
    private volatile long id;
 
    /** This is used in */
@@ -242,8 +242,8 @@ public final class ChannelImpl implements Channel {
       synchronized (sendLock) {
          packet.setChannelID(id);
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
+         if (logger.isTraceEnabled()) {
+            logger.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
          }
 
          ActiveMQBuffer buffer = packet.encode(connection);
@@ -258,7 +258,7 @@ public final class ChannelImpl implements Channel {
                   }
                   else {
                      if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
-                        ActiveMQClientLogger.LOGGER.debug("timed-out waiting for fail-over condition on non-blocking send");
+                        logger.debug("timed-out waiting for fail-over condition on non-blocking send");
                      }
                   }
                }
@@ -280,8 +280,8 @@ public final class ChannelImpl implements Channel {
             lock.unlock();
          }
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Writing buffer for channelID=" + id);
+         if (logger.isTraceEnabled()) {
+            logger.trace("Writing buffer for channelID=" + id);
          }
 
          checkReconnectID(reconnectID);
@@ -346,7 +346,7 @@ public final class ChannelImpl implements Channel {
                   }
                   else {
                      if (!failoverCondition.await(connection.getBlockingCallFailoverTimeout(), TimeUnit.MILLISECONDS)) {
-                        ActiveMQClientLogger.LOGGER.debug("timed-out waiting for fail-over condition on blocking send");
+                        logger.debug("timed-out waiting for fail-over condition on blocking send");
                      }
                   }
                }
@@ -427,12 +427,12 @@ public final class ChannelImpl implements Channel {
             try {
                boolean callNext = interceptor.intercept(packet, connection);
 
-               if (ActiveMQClientLogger.LOGGER.isDebugEnabled()) {
+               if (logger.isDebugEnabled()) {
                   // use a StringBuilder for speed since this may be executed a lot
                   StringBuilder msg = new StringBuilder();
                   msg.append("Invocation of interceptor ").append(interceptor.getClass().getName()).append(" on ").
                      append(packet).append(" returned ").append(callNext);
-                  ActiveMQClientLogger.LOGGER.debug(msg.toString());
+                  logger.debug(msg.toString());
                }
 
                if (!callNext) {
@@ -505,8 +505,8 @@ public final class ChannelImpl implements Channel {
    @Override
    public void replayCommands(final int otherLastConfirmedCommandID) {
       if (resendCache != null) {
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("Replaying commands on channelID=" + id);
+         if (logger.isTraceEnabled()) {
+            logger.trace("Replaying commands on channelID=" + id);
          }
          clearUpTo(otherLastConfirmedCommandID);
 
@@ -553,8 +553,8 @@ public final class ChannelImpl implements Channel {
 
          confirmed.setChannelID(id);
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
+         if (logger.isTraceEnabled()) {
+            logger.trace("ChannelImpl::flushConfirmation flushing confirmation " + confirmed);
          }
 
          doWrite(confirmed);
@@ -566,8 +566,8 @@ public final class ChannelImpl implements Channel {
       if (resendCache != null && packet.isRequiresConfirmations()) {
          lastConfirmedCommandID.incrementAndGet();
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
+         if (logger.isTraceEnabled()) {
+            logger.trace("ChannelImpl::confirming packet " + packet + " last commandID=" + lastConfirmedCommandID);
          }
 
          receivedBytes += packet.getPacketSize();
@@ -639,16 +639,16 @@ public final class ChannelImpl implements Channel {
    private void addResendPacket(Packet packet) {
       resendCache.add(packet);
 
-      if (isTrace) {
-         ActiveMQClientLogger.LOGGER.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
+      if (logger.isTraceEnabled()) {
+         logger.trace("ChannelImpl::addResendPacket adding packet " + packet + " stored commandID=" + firstStoredCommandID + " possible commandIDr=" + (firstStoredCommandID + resendCache.size()));
       }
    }
 
    private void clearUpTo(final int lastReceivedCommandID) {
       final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
 
-      if (isTrace) {
-         ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
+      if (logger.isTraceEnabled()) {
+         logger.trace("ChannelImpl::clearUpTo lastReceived commandID=" + lastReceivedCommandID +
                                               " first commandID=" + firstStoredCommandID +
                                               " number to clear " + numberToClear);
       }
@@ -662,8 +662,8 @@ public final class ChannelImpl implements Channel {
             return;
          }
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
+         if (logger.isTraceEnabled()) {
+            logger.trace("ChannelImpl::clearUpTo confirming " + packet + " towards " + commandConfirmationHandler);
          }
          if (commandConfirmationHandler != null) {
             commandConfirmationHandler.commandConfirmed(packet);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
index 1cb4f2a..b051519 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java
@@ -38,18 +38,11 @@ import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
 import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.jboss.logging.Logger;
 
 public class RemotingConnectionImpl extends AbstractRemotingConnection implements CoreRemotingConnection {
-   // Constants
-   // ------------------------------------------------------------------------------------
+   private static final Logger logger = Logger.getLogger(RemotingConnectionImpl.class);
 
-   private static final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
-
-   // Static
-   // ---------------------------------------------------------------------------------------
-
-   // Attributes
-   // -----------------------------------------------------------------------------------
    private final PacketDecoder packetDecoder;
 
    private final Map<Long, Channel> channels = new ConcurrentHashMap<>();
@@ -342,8 +335,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
       try {
          final Packet packet = packetDecoder.decode(buffer);
 
-         if (isTrace) {
-            ActiveMQClientLogger.LOGGER.trace("handling packet " + packet);
+         if (logger.isTraceEnabled()) {
+            logger.trace("handling packet " + packet);
          }
 
          dataReceived = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
index 6853b79..b209f89 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java
@@ -101,10 +101,12 @@ import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.FutureLatch;
+import org.jboss.logging.Logger;
 
 import static org.apache.activemq.artemis.utils.Base64.encodeBytes;
 
 public class NettyConnector extends AbstractConnector {
+   private static final Logger logger = Logger.getLogger(NettyConnector.class);
 
    // Constants -----------------------------------------------------
    public static final String JAVAX_KEYSTORE_PATH_PROP_NAME = "javax.net.ssl.keyStore";
@@ -528,7 +530,7 @@ public class NettyConnector extends AbstractConnector {
          batchFlusherFuture = scheduledThreadPool.scheduleWithFixedDelay(flusher, batchDelay, batchDelay, TimeUnit.MILLISECONDS);
       }
 
-      ActiveMQClientLogger.LOGGER.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION);
+      logger.debug("Started Netty Connector version " + TransportConstants.NETTY_VERSION);
    }
 
    @Override
@@ -589,7 +591,7 @@ public class NettyConnector extends AbstractConnector {
          }
       }
 
-      ActiveMQClientLogger.LOGGER.debug("Remote destination: " + remoteDestination);
+      logger.debug("Remote destination: " + remoteDestination);
 
       ChannelFuture future;
       //port 0 does not work so only use local address if set
@@ -659,7 +661,7 @@ public class NettyConnector extends AbstractConnector {
                request.headers().set(SEC_ACTIVEMQ_REMOTING_KEY, key);
                ch.attr(REMOTING_KEY).set(key);
 
-               ActiveMQClientLogger.LOGGER.debugf("Sending HTTP request %s", request);
+               logger.debugf("Sending HTTP request %s", request);
 
                // Send the HTTP request.
                ch.writeAndFlush(request);
@@ -985,7 +987,7 @@ public class NettyConnector extends AbstractConnector {
          InetAddress inetAddr2 = InetAddress.getByName(this.host);
          String ip1 = inetAddr1.getHostAddress();
          String ip2 = inetAddr2.getHostAddress();
-         ActiveMQClientLogger.LOGGER.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2);
+         logger.debug(this + " host 1: " + host + " ip address: " + ip1 + " host 2: " + this.host + " ip address: " + ip2);
 
          result = ip1.equals(ip2);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
index ee2449b..b7c0d17 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java
@@ -30,9 +30,12 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.jboss.logging.Logger;
 
 public abstract class AbstractRemotingConnection implements RemotingConnection {
 
+   private static final Logger logger = Logger.getLogger(AbstractRemotingConnection.class);
+
    protected final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();
    protected final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();
    protected final Connection transportConnection;
@@ -65,7 +68,7 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
          }
          catch (ActiveMQInterruptedException interrupted) {
             // this is an expected behaviour.. no warn or error here
-            ActiveMQClientLogger.LOGGER.debug("thread interrupted", interrupted);
+            logger.debug("thread interrupted", interrupted);
          }
          catch (final Throwable t) {
             // Failure of one listener to execute shouldn't prevent others

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index b1e9d88..b0b13ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -23,12 +23,15 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.jboss.logging.Logger;
 
 /**
  * A factory for producing executors that run all tasks in order, which delegate to a single common executor instance.
  */
 public final class OrderedExecutorFactory implements ExecutorFactory {
 
+   private static final Logger logger = Logger.getLogger(OrderedExecutorFactory.class);
+
    private final Executor parent;
 
    /**
@@ -101,7 +104,7 @@ public final class OrderedExecutorFactory implements ExecutorFactory {
                      }
                      catch (ActiveMQInterruptedException e) {
                         // This could happen during shutdowns. Nothing to be concerned about here
-                        ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e);
+                        logger.debug("Interrupted Thread", e);
                      }
                      catch (Throwable t) {
                         ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
index 4cfbfa4..6428c8a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
@@ -28,11 +28,11 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.jboss.logging.Logger;
 
 public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implements Map<K, V> {
 
-   private final boolean isTrace = ActiveMQClientLogger.LOGGER.isTraceEnabled();
+   private static final Logger logger = Logger.getLogger(SoftValueHashMap.class);
 
    // The soft references that are already good.
    // too bad there's no way to override the queue method on ReferenceQueue, so I wouldn't need this
@@ -170,8 +170,8 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
             if (ref.used > 0) {
                Object removed = mapDelegate.remove(ref.key);
 
-               if (isTrace) {
-                  ActiveMQClientLogger.LOGGER.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Removing " + removed + " with id = " + ref.key + " from SoftValueHashMap");
                }
 
                if (mapDelegate.size() <= maxElements) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec526935/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XMLUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XMLUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XMLUtil.java
index bd96a71..ca51c08 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XMLUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/XMLUtil.java
@@ -34,6 +34,7 @@ import java.util.List;
 
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
 import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
+import org.jboss.logging.Logger;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.NamedNodeMap;
@@ -44,6 +45,8 @@ import org.xml.sax.SAXException;
 
 public final class XMLUtil {
 
+   private static final Logger logger = Logger.getLogger(XMLUtil.class);
+
    private XMLUtil() {
       // Utility class
    }
@@ -288,7 +291,7 @@ public final class XMLUtil {
             val = parts[1].trim();
          }
          String sysProp = System.getProperty(prop, val);
-         ActiveMQClientLogger.LOGGER.debug("replacing " + subString + " with " + sysProp);
+         logger.debug("replacing " + subString + " with " + sysProp);
          xml = xml.replace(subString, sysProp);
 
       }


[2/4] activemq-artemis git commit: ARTEMIS-524 Paging could lose data eventually after crashes

Posted by jb...@apache.org.
ARTEMIS-524 Paging could lose data eventually after crashes

https://issues.apache.org/jira/browse/ARTEMIS-524

I am keeping all the debug ad tracing I added during the debug of this issue,
for that reason this commit may look longer than expected

The fix will be highlited by the tests added on org.apache.activemq.artemis.tests.integration.client.PagingTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3e2adf12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3e2adf12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3e2adf12

Branch: refs/heads/master
Commit: 3e2adf123b96c3dfade3d1584ea0ddf65a876941
Parents: ec52693
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 17 15:06:02 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 17 20:28:40 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/ClientConsumerImpl.java    |  90 ++++++-
 .../core/impl/ActiveMQConsumerContext.java      |   7 +
 .../core/protocol/core/impl/ChannelImpl.java    |   3 +
 .../artemis/utils/SoftValueHashMap.java         |   1 +
 .../jms/client/ActiveMQMessageConsumer.java     |   2 +
 .../artemis/jms/client/JMSExceptionHelper.java  |  10 +
 .../jms/client/JMSMessageListenerWrapper.java   |   3 +
 .../artemis/core/paging/PagingStoreFactory.java |   5 +
 .../core/paging/cursor/NonExistentPage.java     |  43 ++++
 .../core/paging/cursor/PageCursorProvider.java  |   3 +
 .../core/paging/cursor/PageSubscription.java    |   2 +-
 .../paging/cursor/impl/LivePageCacheImpl.java   |   6 +-
 .../cursor/impl/PageCursorProviderImpl.java     |  91 ++++---
 .../cursor/impl/PageSubscriptionImpl.java       |  53 +++-
 .../activemq/artemis/core/paging/impl/Page.java |   4 +-
 .../paging/impl/PageTransactionInfoImpl.java    |  20 ++
 .../core/paging/impl/PagingStoreFactoryNIO.java |   7 +
 .../core/paging/impl/PagingStoreImpl.java       |  11 +-
 .../impl/journal/JournalStorageManager.java     |   6 +-
 .../impl/journal/LargeServerMessageImpl.java    |  15 +-
 .../postoffice/impl/DuplicateIDCacheImpl.java   |   2 +-
 .../server/impl/RemotingServiceImpl.java        |   2 +
 .../core/server/impl/ActiveMQServerImpl.java    |   8 +-
 .../artemis/core/server/impl/RefsOperation.java |   5 +
 .../core/server/impl/ServerConsumerImpl.java    |   8 +
 .../tests/integration/client/PagingTest.java    | 254 +++++++++++++++++++
 .../core/paging/impl/PagingStoreImplTest.java   |  14 +-
 27 files changed, 605 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
index 7b72188..57bb869 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java
@@ -171,6 +171,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       this.contextClassLoader = contextClassLoader;
 
       this.flowControlExecutor = flowControlExecutor;
+
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + ":: being created at", new Exception("trace"));
+      }
    }
 
    // ClientConsumer implementation
@@ -182,9 +186,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
    }
 
    private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws ActiveMQException {
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ")");
+      }
+
       checkClosed();
 
       if (largeMessageReceived != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> discard LargeMessage body for " + largeMessageReceived);
+         }
          // Check if there are pending packets to be received
          largeMessageReceived.discardBody();
          largeMessageReceived = null;
@@ -195,10 +206,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       }
 
       if (handler != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> throwing messageHandlerSet");
+         }
          throw ActiveMQClientMessageBundle.BUNDLE.messageHandlerSet();
       }
 
       if (clientWindowSize == 0) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> start slowConsumer");
+         }
          startSlowConsumer();
       }
 
@@ -235,6 +252,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                      }
                   }
 
+                  if ( m != null) {
+                     session.workDone();
+                  }
+
                   try {
                      wait(toWait);
                   }
@@ -256,6 +277,10 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
             if (failedOver) {
                if (m == null) {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> m == null and failover");
+                  }
+
                   // if failed over and the buffer is null, we reset the state and try it again
                   failedOver = false;
                   deliveryForced = false;
@@ -263,13 +288,16 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                   continue;
                }
                else {
+                  if (logger.isTraceEnabled()) {
+                     logger.trace(this + "::receive(" + timeout + ", " + forcingDelivery + ") -> failedOver, but m != null, being " + m);
+                  }
                   failedOver = false;
                }
             }
 
             if (callForceDelivery) {
                if (logger.isTraceEnabled()) {
-                  logger.trace("Forcing delivery");
+                  logger.trace(this + "::Forcing delivery");
                }
                // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
                sessionContext.forceDelivery(this, forceDeliveryCount++);
@@ -291,14 +319,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                      resetIfSlowConsumer();
 
                      if (logger.isTraceEnabled()) {
-                        logger.trace("There was nothing on the queue, leaving it now:: returning null");
+                        logger.trace(this + "::There was nothing on the queue, leaving it now:: returning null");
                      }
 
                      return null;
                   }
                   else {
                      if (logger.isTraceEnabled()) {
-                        logger.trace("Ignored force delivery answer as it belonged to another call");
+                        logger.trace(this + "::Ignored force delivery answer as it belonged to another call");
                      }
                      // Ignore the message
                      continue;
@@ -331,14 +359,14 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                }
 
                if (logger.isTraceEnabled()) {
-                  logger.trace("Returning " + m);
+                  logger.trace(this + "::Returning " + m);
                }
 
                return m;
             }
             else {
                if (logger.isTraceEnabled()) {
-                  logger.trace("Returning null");
+                  logger.trace(this + "::Returning null");
                }
                resetIfSlowConsumer();
                return null;
@@ -352,12 +380,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
    @Override
    public ClientMessage receive(final long timeout) throws ActiveMQException {
+
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + ":: receive(" + timeout + ")");
+      }
       ClientMessage msg = receive(timeout, false);
 
       if (msg == null && !closed) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + ":: receive(" + timeout + ") -> null, trying again with receive(0)");
+         }
          msg = receive(0, true);
       }
 
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + ":: returning " + msg);
+      }
+
       return msg;
    }
 
@@ -471,6 +510,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
    @Override
    public void clearAtFailover() {
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::ClearAtFailover");
+      }
       clearBuffer();
 
       // failover will issue a start later
@@ -647,7 +689,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
       }
       if (currentLargeMessageController == null) {
          if (logger.isTraceEnabled()) {
-            logger.trace("Sending back credits for largeController = null " + flowControlSize);
+            logger.trace(this + "::Sending back credits for largeController = null " + flowControlSize);
          }
          flowControl(flowControlSize, false);
       }
@@ -722,12 +764,23 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
          individualAcknowledge(message);
       }
       else {
+
          ackBytes += message.getEncodeSize();
 
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + "::acknowledge ackBytes=" + ackBytes + " and ackBatchSize=" + ackBatchSize + ", encodeSize=" + message.getEncodeSize());
+         }
+
          if (ackBytes >= ackBatchSize) {
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + ":: acknowledge acking " + cmi);
+            }
             doAck(cmi);
          }
          else {
+            if (logger.isTraceEnabled()) {
+               logger.trace(this + ":: acknowledge setting lastAckedMessage = " + cmi);
+            }
             lastAckedMessage = cmi;
          }
       }
@@ -745,6 +798,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
    @Override
    public void flushAcks() throws ActiveMQException {
       if (lastAckedMessage != null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace(this + "::FlushACK acking lastMessage::" + lastAckedMessage);
+         }
          doAck(lastAckedMessage);
       }
    }
@@ -763,7 +819,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
          if (creditsToSend >= clientWindowSize) {
             if (clientWindowSize == 0 && discountSlowConsumer) {
                if (logger.isTraceEnabled()) {
-                  logger.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
+                  logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
                }
 
                // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
@@ -810,7 +866,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
     */
    private void startSlowConsumer() {
       if (logger.isTraceEnabled()) {
-         logger.trace("Sending 1 credit to start delivering of one message to slow consumer");
+         logger.trace(this + "::Sending 1 credit to start delivering of one message to slow consumer");
       }
       sendCredits(1);
       try {
@@ -855,7 +911,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
    private void queueExecutor() {
       if (logger.isTraceEnabled()) {
-         logger.trace("Adding Runner on Executor for delivery");
+         logger.trace(this + "::Adding Runner on Executor for delivery");
       }
 
       sessionExecutor.execute(runner);
@@ -946,7 +1002,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
             if (!expired) {
                if (logger.isTraceEnabled()) {
-                  logger.trace("Calling handler.onMessage");
+                  logger.trace(this + "::Calling handler.onMessage");
                }
                final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                   @Override
@@ -981,7 +1037,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
                }
 
                if (logger.isTraceEnabled()) {
-                  logger.trace("Handler.onMessage done");
+                  logger.trace(this + "::Handler.onMessage done");
                }
 
                if (message.isLargeMessage()) {
@@ -1065,9 +1121,21 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
 
       lastAckedMessage = null;
 
+      if (logger.isTraceEnabled()) {
+         logger.trace(this + "::Acking message " + message);
+      }
+
       session.acknowledge(this, message);
    }
 
+   @Override
+   public String toString() {
+      return super.toString() + "{" +
+         "consumerContext=" + consumerContext +
+         ", queueName=" + queueName +
+         '}';
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
index 08abb91..65540ee 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQConsumerContext.java
@@ -46,6 +46,13 @@ public class ActiveMQConsumerContext extends ConsumerContext {
    }
 
    @Override
+   public String toString() {
+      return "ActiveMQConsumerContext{" +
+         "id=" + id +
+         '}';
+   }
+
+   @Override
    public int hashCode() {
       return (int) (id ^ (id >>> 32));
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 957a3a9..b4ac75d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -363,6 +363,9 @@ public final class ChannelImpl implements Channel {
 
             checkReconnectID(reconnectID);
 
+            if (logger.isTraceEnabled()) {
+               logger.trace("Sending blocking " + packet);
+            }
             connection.getTransportConnection().write(buffer, false, false);
 
             long toWait = connection.getBlockingCallTimeout();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
index 6428c8a..b499910 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/SoftValueHashMap.java
@@ -316,6 +316,7 @@ public class SoftValueHashMap<K, V extends SoftValueHashMap.ValueCache> implemen
    private void processQueue() {
       AggregatedSoftReference ref = null;
       while ((ref = (AggregatedSoftReference) this.refQueue.poll()) != null) {
+         logger.tracef("Removing reference through processQueue:: %s", ref.get());
          mapDelegate.remove(ref.key);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 04e4f41..8929fa5 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -27,12 +27,14 @@ import javax.jms.Topic;
 import javax.jms.TopicSubscriber;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 
 /**
  * ActiveMQ Artemis implementation of a JMS MessageConsumer.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
index 666fa9d..1a8456f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSExceptionHelper.java
@@ -22,9 +22,19 @@ import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 
 public final class JMSExceptionHelper {
 
+   public static JMSException convertFromActiveMQException(final ActiveMQInterruptedException me) {
+      JMSException je = new javax.jms.IllegalStateException(me.getMessage());
+
+      je.setStackTrace(me.getStackTrace());
+
+      je.initCause(me);
+      return je;
+   }
+
    public static JMSException convertFromActiveMQException(final ActiveMQException me) {
       JMSException je;
       switch (me.getType()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
index ab62dbc..af5b158 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/JMSMessageListenerWrapper.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
 
 public class JMSMessageListenerWrapper implements MessageHandler {
 
@@ -83,6 +84,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
             message.acknowledge();
          }
          catch (ActiveMQException e) {
+            ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
             ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
          }
       }
@@ -122,6 +124,7 @@ public class JMSMessageListenerWrapper implements MessageHandler {
             }
          }
          catch (ActiveMQException e) {
+            ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
             ActiveMQJMSClientLogger.LOGGER.errorProcessingMessage(e);
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
index 8c2d11a..7c52c63 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStoreFactory.java
@@ -17,9 +17,12 @@
 package org.apache.activemq.artemis.core.paging;
 
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.SequentialFileFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 
@@ -30,6 +33,8 @@ public interface PagingStoreFactory {
 
    PagingStore newStore(SimpleString address, AddressSettings addressSettings);
 
+   PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor);
+
    void stop() throws InterruptedException;
 
    void setPagingManager(PagingManager manager);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
new file mode 100644
index 0000000..73a22ce
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/NonExistentPage.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.paging.cursor;
+
+/** This is an internal exception.
+ *  In certain cases AfterCommit could try to decrease the reference counting on large messages.
+ *  But if the whole page is cleaned an exception could happen, which is ok on that path, and we need to identify it. */
+public class NonExistentPage extends RuntimeException {
+
+   public NonExistentPage() {
+   }
+
+   public NonExistentPage(String message) {
+      super(message);
+   }
+
+   public NonExistentPage(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public NonExistentPage(Throwable cause) {
+      super(cause);
+   }
+
+   public NonExistentPage(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+      super(message, cause, enableSuppression, writableStackTrace);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index 951b83c..b2a6aff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -24,6 +24,9 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
  */
 public interface PageCursorProvider {
 
+   /** Used on tests, to simulate a scenario where the VM cleared space */
+   void clearCache();
+
    PageCache getPageCache(long pageNr);
 
    PagedReference newReference(final PagePosition pos, final PagedMessage msg, PageSubscription sub);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index df2ccc3..89c6d44 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -93,7 +93,7 @@ public interface PageSubscription {
     */
    void reloadACK(PagePosition position);
 
-   void reloadPageCompletion(PagePosition position);
+   void reloadPageCompletion(PagePosition position) throws Exception;
 
    void reloadPageInfo(long pageNr);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
index 29d990a..b964b56 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
@@ -23,13 +23,16 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.jboss.logging.Logger;
 
 /**
  * This is the same as PageCache, however this is for the page that's being currently written.
  */
 public class LivePageCacheImpl implements LivePageCache {
 
-   private final List<PagedMessage> messages = new LinkedList<>();
+   private static final Logger logger = Logger.getLogger(LivePageCacheImpl.class);
+
+   private final List<PagedMessage> messages = new LinkedList<PagedMessage>();
 
    private final Page page;
 
@@ -82,6 +85,7 @@ public class LivePageCacheImpl implements LivePageCache {
 
    @Override
    public synchronized void close() {
+      logger.tracef("Closing %s", this);
       this.isLive = false;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 9862a1f..4f3a6a5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
 import org.apache.activemq.artemis.core.paging.cursor.PageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
@@ -58,20 +59,20 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    /**
     * As an optimization, avoid subsequent schedules as they are unnecessary
     */
-   private final AtomicInteger scheduledCleanup = new AtomicInteger(0);
+   protected final AtomicInteger scheduledCleanup = new AtomicInteger(0);
 
-   private volatile boolean cleanupEnabled = true;
+   protected volatile boolean cleanupEnabled = true;
 
-   private final PagingStore pagingStore;
+   protected final PagingStore pagingStore;
 
-   private final StorageManager storageManager;
+   protected final StorageManager storageManager;
 
    // This is the same executor used at the PageStoreImpl. One Executor per pageStore
    private final Executor executor;
 
    private final SoftValueHashMap<Long, PageCache> softCache;
 
-   private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<>();
+   private final ConcurrentMap<Long, PageSubscription> activeCursors = new ConcurrentHashMap<Long, PageSubscription>();
 
    // Static --------------------------------------------------------
 
@@ -115,7 +116,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
       if (cache == null || pos.getMessageNr() >= cache.getNumberOfMessages()) {
          // sanity check, this should never happen unless there's a bug
-         throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
+         throw new NonExistentPage("Invalid messageNumber passed = " + pos + " on " + cache);
       }
 
       return cache.getMessage(pos.getMessageNr());
@@ -146,9 +147,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                cache = createPageCache(pageId);
                // anyone reading from this cache will have to wait reading to finish first
                // we also want only one thread reading this cache
-               if (logger.isTraceEnabled()) {
-                  logger.trace("adding " + pageId + " into cursor = " + this.pagingStore.getAddress());
-               }
+               logger.tracef("adding pageCache pageNr=%d into cursor = %s", pageId, this.pagingStore.getAddress());
                readPage((int) pageId, cache);
                softCache.put(pageId, cache);
             }
@@ -186,6 +185,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    @Override
    public void addPageCache(PageCache cache) {
+      logger.tracef("Add page cache %s", cache);
       synchronized (softCache) {
          softCache.put(cache.getPageId(), cache);
       }
@@ -203,6 +203,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
       }
    }
 
+   @Override
    public void clearCache() {
       synchronized (softCache) {
          softCache.clear();
@@ -273,6 +274,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    @Override
    public void scheduleCleanup() {
 
+      if (logger.isTraceEnabled()) {
+         logger.trace("scheduling cleanup", new Exception("trace"));
+      }
       if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
          // Scheduled cleanup was already scheduled before.. never mind!
          // or we have cleanup disabled
@@ -286,7 +290,9 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          public void run() {
             storageManager.setContext(storageManager.newSingleThreadContext());
             try {
-               cleanup();
+               if (cleanupEnabled) {
+                  cleanup();
+               }
             }
             finally {
                storageManager.clearContext();
@@ -336,7 +342,10 @@ public class PageCursorProviderImpl implements PageCursorProvider {
 
    @Override
    public void cleanup() {
-      ArrayList<Page> depagedPages = new ArrayList<>();
+
+      logger.tracef("performing page cleanup %s", this);
+
+      ArrayList<Page> depagedPages = new ArrayList<Page>();
 
       while (true) {
          if (pagingStore.lock(100)) {
@@ -346,6 +355,8 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             return;
       }
 
+      logger.tracef("%s locked", this);
+
       synchronized (this) {
          try {
             if (!pagingStore.isStarted()) {
@@ -356,14 +367,12 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                return;
             }
 
-            if (logger.isDebugEnabled()) {
-               logger.debug("Asserting cleanup for address " + this.pagingStore.getAddress());
-            }
-
             ArrayList<PageSubscription> cursorList = cloneSubscriptions();
 
             long minPage = checkMinPage(cursorList);
 
+            logger.debugf("Asserting cleanup for address %s, firstPage=%d", pagingStore.getAddress(), minPage);
+
             // if the current page is being written...
             // on that case we need to move to verify it in a different way
             if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0) {
@@ -376,18 +385,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
                // All the pages on the cursor are complete.. so we will cleanup everything and store a bookmark
                if (complete) {
 
-                  if (logger.isDebugEnabled()) {
-                     logger.debug("Address " + pagingStore.getAddress() +
-                                                          " is leaving page mode as all messages are consumed and acknowledged from the page store");
-                  }
-
-                  pagingStore.forceAnotherPage();
-
-                  Page currentPage = pagingStore.getCurrentPage();
-
-                  storeBookmark(cursorList, currentPage);
-
-                  pagingStore.stopPaging();
+                  cleanupComplete(cursorList);
                }
             }
 
@@ -423,7 +421,30 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             pagingStore.unlock();
          }
       }
+      finishCleanup(depagedPages);
+
+
+   }
+
+   // Protected as a way to inject testing
+   protected void cleanupComplete(ArrayList<PageSubscription> cursorList) throws Exception {
+      if (logger.isDebugEnabled()) {
+         logger.debug("Address " + pagingStore.getAddress() +
+                                              " is leaving page mode as all messages are consumed and acknowledged from the page store");
+      }
+
+      pagingStore.forceAnotherPage();
+
+      Page currentPage = pagingStore.getCurrentPage();
 
+      storeBookmark(cursorList, currentPage);
+
+      pagingStore.stopPaging();
+   }
+
+   // Protected as a way to inject testing
+   protected void finishCleanup(ArrayList<Page> depagedPages) {
+      logger.tracef("this(%s) finishing cleanup on %s", this, depagedPages);
       try {
          for (Page depagedPage : depagedPages) {
             PageCache cache;
@@ -433,7 +454,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
             }
 
             if (logger.isTraceEnabled()) {
-               logger.trace("Removing page " + depagedPage.getPageId() + " from page-cache");
+               logger.trace("Removing pageNr=" + depagedPage.getPageId() + " from page-cache");
             }
 
             if (cache == null) {
@@ -479,12 +500,15 @@ public class PageCursorProviderImpl implements PageCursorProvider {
    }
 
    private boolean checkPageCompletion(ArrayList<PageSubscription> cursorList, long minPage) {
+
+      logger.tracef("checkPageCompletion(%d)", minPage);
+
       boolean complete = true;
 
       for (PageSubscription cursor : cursorList) {
          if (!cursor.isComplete(minPage)) {
             if (logger.isDebugEnabled()) {
-               logger.debug("Cursor " + cursor + " was considered incomplete at page " + minPage);
+               logger.debug("Cursor " + cursor + " was considered incomplete at pageNr=" + minPage);
             }
 
             complete = false;
@@ -492,7 +516,7 @@ public class PageCursorProviderImpl implements PageCursorProvider {
          }
          else {
             if (logger.isDebugEnabled()) {
-               logger.debug("Cursor " + cursor + "was considered **complete** at page " + minPage);
+               logger.debug("Cursor " + cursor + " was considered **complete** at pageNr=" + minPage);
             }
          }
       }
@@ -545,6 +569,13 @@ public class PageCursorProviderImpl implements PageCursorProvider {
       }
    }
 
+   @Override
+   public String toString() {
+      return "PageCursorProviderImpl{" +
+         "pagingStore=" + pagingStore +
+         '}';
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 57b4efe..440f845 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -191,7 +191,11 @@ final class PageSubscriptionImpl implements PageSubscription {
     * cursor/subscription.
     */
    @Override
-   public void reloadPageCompletion(PagePosition position) {
+   public void reloadPageCompletion(PagePosition position) throws Exception {
+      // if the current page is complete, we must move it out of the way
+      if (pageStore.getCurrentPage().getPageId() == position.getPageNr()) {
+         pageStore.forceAnotherPage();
+      }
       PageCursorInfo info = new PageCursorInfo(position.getPageNr(), position.getMessageNr(), null);
       info.setCompleteInfo(position);
       synchronized (consumedPages) {
@@ -202,6 +206,9 @@ final class PageSubscriptionImpl implements PageSubscription {
    @Override
    public void scheduleCleanupCheck() {
       if (autoCleanup) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Scheduling cleanup", new Exception("trace"));
+         }
          if (scheduledCleanupCount.get() > 2) {
             return;
          }
@@ -212,7 +219,9 @@ final class PageSubscriptionImpl implements PageSubscription {
             @Override
             public void run() {
                try {
-                  cleanupEntries(false);
+                  if (autoCleanup) {
+                     cleanupEntries(false);
+                  }
                }
                catch (Exception e) {
                   ActiveMQServerLogger.LOGGER.problemCleaningCursorPages(e);
@@ -242,6 +251,9 @@ final class PageSubscriptionImpl implements PageSubscription {
       if (completeDelete) {
          counter.delete();
       }
+      if (logger.isTraceEnabled()) {
+         logger.trace("cleanupEntries", new Exception("trace"));
+      }
       Transaction tx = new TransactionImpl(store);
 
       boolean persist = false;
@@ -564,17 +576,23 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    @Override
    public boolean isComplete(long page) {
+      logger.tracef("%s isComplete %d", this, page);
       synchronized (consumedPages) {
          if (empty && consumedPages.isEmpty()) {
+            if (logger.isTraceEnabled()) {
+               logger.tracef("isComplete(%d)::Subscription %s has empty=%s, consumedPages.isEmpty=%s", (Object)page, this, consumedPages.isEmpty());
+            }
             return true;
          }
 
          PageCursorInfo info = consumedPages.get(page);
 
          if (info == null && empty) {
+            logger.tracef("isComplete(%d)::::Couldn't find info and it is empty", page);
             return true;
          }
          else {
+            logger.tracef("isComplete(%d)::calling is %s", (Object)page, this, consumedPages.isEmpty());
             return info != null && info.isDone();
          }
       }
@@ -731,18 +749,18 @@ final class PageSubscriptionImpl implements PageSubscription {
 
    @Override
    public void reloadPageInfo(long pageNr) {
-      getPageInfo(pageNr, true);
+      getPageInfo(pageNr);
    }
 
    private PageCursorInfo getPageInfo(final PagePosition pos) {
-      return getPageInfo(pos.getPageNr(), true);
+      return getPageInfo(pos.getPageNr());
    }
 
-   private PageCursorInfo getPageInfo(final long pageNr, boolean create) {
+   private PageCursorInfo getPageInfo(final long pageNr) {
       synchronized (consumedPages) {
          PageCursorInfo pageInfo = consumedPages.get(pageNr);
 
-         if (create && pageInfo == null) {
+         if (pageInfo == null) {
             PageCache cache = cursorProvider.getPageCache(pageNr);
             if (cache == null) {
                return null;
@@ -814,7 +832,11 @@ final class PageSubscriptionImpl implements PageSubscription {
          tx.setContainsPersistent();
       }
 
-      getPageInfo(position).remove(position);
+      PageCursorInfo info = getPageInfo(position);
+
+      logger.tracef("InstallTXCallback looking up pagePosition %s, result=%s", position, info);
+
+      info.remove(position);
 
       PageCursorTX cursorTX = (PageCursorTX) tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
 
@@ -897,16 +919,17 @@ final class PageSubscriptionImpl implements PageSubscription {
       @Override
       public String toString() {
          try {
-            return "PageCursorInfo::PageID=" + pageId +
+            return "PageCursorInfo::pageNr=" + pageId +
                " numberOfMessage = " +
                numberOfMessages +
                ", confirmed = " +
                confirmed +
                ", isDone=" +
-               this.isDone();
+               this.isDone() +
+               " wasLive = " + wasLive;
          }
          catch (Exception e) {
-            return "PageCursorInfo::PageID=" + pageId +
+            return "PageCursorInfo::pageNr=" + pageId +
                " numberOfMessage = " +
                numberOfMessages +
                ", confirmed = " +
@@ -917,6 +940,7 @@ final class PageSubscriptionImpl implements PageSubscription {
       }
 
       public PageCursorInfo(final long pageId, final int numberOfMessages, final PageCache cache) {
+         logger.tracef("Created PageCursorInfo for pageNr=%d, numberOfMessages=%d,  cache=%s", pageId, numberOfMessages, cache);
          this.pageId = pageId;
          this.numberOfMessages = numberOfMessages;
          if (cache != null) {
@@ -932,6 +956,7 @@ final class PageSubscriptionImpl implements PageSubscription {
        * @param completePage
        */
       public void setCompleteInfo(final PagePosition completePage) {
+         logger.tracef("Setting up complete page %s on cursor %s on subscription %s", completePage, this, PageSubscriptionImpl.this);
          this.completePage = completePage;
       }
 
@@ -940,6 +965,10 @@ final class PageSubscriptionImpl implements PageSubscription {
       }
 
       public boolean isDone() {
+         if (logger.isTraceEnabled()) {
+            logger.trace(PageSubscriptionImpl.this + "::PageCursorInfo(" + pageId + ")::isDone checking with completePage!=null->" + (completePage != null) + " getNumberOfMessages=" + getNumberOfMessages() + ", confirmed=" + confirmed.get() + " and pendingTX=" + pendingTX.get());
+
+         }
          return completePage != null || (getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0);
       }
 
@@ -983,7 +1012,7 @@ final class PageSubscriptionImpl implements PageSubscription {
                                                     " confirmed =  " +
                                                     (confirmed.get() + 1) +
                                                     " pendingTX = " + pendingTX +
-                                                    ", page = " +
+                                                    ", pageNr = " +
                                                     pageId + " posACK = " + posACK);
             }
             catch (Throwable ignored) {
@@ -1189,7 +1218,7 @@ final class PageSubscriptionImpl implements PageSubscription {
                   ignored = true;
                }
 
-               PageCursorInfo info = getPageInfo(message.getPosition().getPageNr(), false);
+               PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
 
                if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
                   continue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
index 83a6c53..0888416 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/Page.java
@@ -251,7 +251,7 @@ public final class Page implements Comparable<Page> {
       }
 
       if (logger.isDebugEnabled()) {
-         logger.debug("Deleting pageId=" + pageId + " on store " + storeName);
+         logger.debug("Deleting pageNr=" + pageId + " on store " + storeName);
       }
 
       if (messages != null) {
@@ -294,7 +294,7 @@ public final class Page implements Comparable<Page> {
 
    @Override
    public String toString() {
-      return "Page::pageID=" + this.pageId + ", file=" + this.file;
+      return "Page::pageNr=" + this.pageId + ", file=" + this.file;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 55569b2..1502855 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -34,12 +34,15 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.utils.DataConstants;
+import org.jboss.logging.Logger;
 
 public final class PageTransactionInfoImpl implements PageTransactionInfo {
    // Constants -----------------------------------------------------
 
    // Attributes ----------------------------------------------------
 
+   private static final Logger logger = Logger.getLogger(PageTransactionInfoImpl.class);
+
    private long transactionID;
 
    private volatile long recordID = -1;
@@ -239,19 +242,36 @@ public final class PageTransactionInfoImpl implements PageTransactionInfo {
    public synchronized boolean deliverAfterCommit(PageIterator iterator,
                                                   PageSubscription cursor,
                                                   PagePosition cursorPos) {
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("deliver after commit on " + cursor + ", position=" + cursorPos);
+      }
+
       if (committed && useRedelivery) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("commit & useRedelivery on " + cursor + ", position=" + cursorPos);
+         }
          cursor.addPendingDelivery(cursorPos);
          cursor.redeliver(iterator, cursorPos);
          return true;
       }
       else if (committed) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("committed on " + cursor + ", position=" + cursorPos + ", ignoring position");
+         }
          return false;
       }
       else if (rolledback) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("rolled back, position ignored on " + cursor + ", position=" + cursorPos);
+         }
          cursor.positionIgnored(cursorPos);
          return true;
       }
       else {
+         if (logger.isTraceEnabled()) {
+            logger.trace("deliverAftercommit/else, marking useRedelivery on " + cursor + ", position " + cursorPos);
+         }
          useRedelivery = true;
          if (lateDeliveries == null) {
             lateDeliveries = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 0f36a31..00da382 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -26,6 +26,7 @@ import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -35,6 +36,8 @@ import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -92,6 +95,10 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
    }
 
    @Override
+   public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
+      return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+   }
+
    public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) {
 
       return new PagingStoreImpl(address, scheduledExecutor, syncTimeout, pagingManager, storageManager, null, this, address, settings, executorFactory.getExecutor(), syncNonTransactional);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index a7baf84..8fec06c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
 import org.apache.activemq.artemis.core.paging.cursor.LivePageCache;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
 import org.apache.activemq.artemis.core.paging.cursor.impl.LivePageCacheImpl;
-import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.replication.ReplicationManager;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
@@ -69,7 +68,7 @@ import org.jboss.logging.Logger;
  */
 public class PagingStoreImpl implements PagingStore {
 
-   private static final Logger logger = Logger.getLogger(Page.class);
+   private static final Logger logger = Logger.getLogger(PagingStoreImpl.class);
 
    private final SimpleString address;
 
@@ -173,7 +172,7 @@ public class PagingStoreImpl implements PagingStore {
          this.syncTimer = null;
       }
 
-      this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executor, addressSettings.getPageCacheMaxSize());
+      this.cursorProvider = storeFactory.newCursorProvider(this, this.storageManager, addressSettings, executor);
 
    }
 
@@ -831,7 +830,7 @@ public class PagingStoreImpl implements PagingStore {
 
             if (logger.isTraceEnabled()) {
                logger.trace("Paging message " + pagedMessage + " on pageStore " + this.getStoreName() +
-                                                    " pageId=" + currentPage.getPageId());
+                                                    " pageNr=" + currentPage.getPageId());
             }
 
             return true;
@@ -1021,6 +1020,10 @@ public class PagingStoreImpl implements PagingStore {
 
          int tmpCurrentPageId = currentPageId + 1;
 
+         if (logger.isTraceEnabled()) {
+            logger.trace("new pageNr=" + tmpCurrentPageId, new Exception("trace"));
+         }
+
          if (currentPage != null) {
             currentPage.close(true);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
index acdf57b..1379308 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
@@ -543,13 +543,15 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
             }
             bindingsJournal = new ReplicatedJournal(((byte) 0), originalBindingsJournal, replicator);
             messageJournal = new ReplicatedJournal((byte) 1, originalMessageJournal, replicator);
+
+            // We need to send the list while locking otherwise part of the body might get sent too soon
+            // it will send a list of IDs that we are allocating
+            replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
          }
          finally {
             storageManagerLock.writeLock().unlock();
          }
 
-         // it will send a list of IDs that we are allocating
-         replicator.sendLargeMessageIdListMessage(pendingLargeMessages);
          sendJournalFile(messageFiles, JournalContent.MESSAGES);
          sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
          sendLargeMessageFiles(pendingLargeMessages);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index be193eb..578db6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -340,11 +340,22 @@ public final class LargeServerMessageImpl extends ServerMessageImpl implements L
 
    @Override
    public String toString() {
-      return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() +
-         ",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
+      return "LargeServerMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
+         ", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
          ", durable=" + durable + ", address=" + getAddress() + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
+   private static String toDate(long timestamp) {
+      if (timestamp == 0) {
+         return "0";
+      }
+      else {
+         return new java.util.Date(timestamp).toString();
+      }
+
+   }
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 29774d6..7f35638 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -222,7 +222,7 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache {
          }
          else {
             if (logger.isTraceEnabled()) {
-               logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID));
+               logger.trace("DuplicateIDCacheImpl(" + this.address + ")::addToCache Adding duplicateID TX operation for " + describeID(duplID, recordID) + ", tx=" + tx);
             }
             // For a tx, it's important that the entry is not added to the cache until commit
             // since if the client fails then resends them tx we don't want it to get rejected

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index 9b3329a..3672fe2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -176,6 +176,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
          return;
       }
 
+      logger.tracef("Starting remoting service %s", this);
+
       paused = false;
 
       // The remoting service maintains it's own thread pool for handling remoting traffic

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 60235de..50437a1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1671,9 +1671,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       this.queueFactory = factory;
    }
 
-   private PagingManager createPagingManager() {
+   protected PagingManager createPagingManager() {
 
-      return new PagingManagerImpl(new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO), addressSettingsRepository);
+      return new PagingManagerImpl(getPagingStoreFactory(), addressSettingsRepository);
+   }
+
+   protected PagingStoreFactoryNIO getPagingStoreFactory() {
+      return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getJournalBufferTimeout_NIO(), scheduledPool, executorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO);
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 3cdaa66..1f5c74c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.core.paging.cursor.NonExistentPage;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -165,6 +166,10 @@ public class RefsOperation extends TransactionOperationAbstract {
       try {
          refmsg.getMessage().decrementRefCount();
       }
+      catch (NonExistentPage e) {
+         // This could happen on after commit, since the page could be deleted on file earlier by another thread
+         logger.debug(e);
+      }
       catch (Exception e) {
          ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 0224c7d..ae1f5b1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -854,7 +854,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       boolean startedTransaction = false;
 
+
+      if (logger.isTraceEnabled()) {
+         logger.trace("individualACK messageID=" + messageID);
+      }
+
       if (tx == null) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("individualACK starting new TX");
+         }
          startedTransaction = true;
          tx = new TransactionImpl(storageManager);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
index 4f64f42..f658fae 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/PagingTest.java
@@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,7 +60,11 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.persistence.impl.journal.AckDescribe;
 import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal;
 import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal.ReferenceDescribe;
@@ -70,14 +76,18 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 public class PagingTest extends ActiveMQTestBase {
 
+   private static final Logger logger = Logger.getLogger(PagingTest.class);
+
    private ServerLocator locator;
    private ActiveMQServer server;
    private ClientSessionFactory sf;
@@ -2914,6 +2924,250 @@ public class PagingTest extends ActiveMQTestBase {
       session.close();
    }
 
+
+   @Test
+   public void testRollbackOnSendThenSendMore() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig();
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(null, null, false, false, true, false, 0);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      Queue queue = server.locateQueue(ADDRESS);
+
+      queue.getPageSubscription().getPagingStore().startPaging();
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message;
+
+      for (int i = 0; i < 20; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(new byte[100 * 4]);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+         session.commit();
+         queue.getPageSubscription().getPagingStore().forceAnotherPage();
+
+      }
+
+      for (int i = 20; i < 24; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(new byte[100 * 4]);
+
+         message.putIntProperty(new SimpleString("id"), i);
+
+         producer.send(message);
+      }
+
+      session.rollback();
+
+      ClientSession consumerSession = sf.createSession(false, false);
+
+
+      queue.getPageSubscription().getPagingStore().disableCleanup();
+
+      queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+
+      consumerSession.start();
+      ClientConsumer consumer = consumerSession.createConsumer(ADDRESS, SimpleString.toSimpleString("id > 0"));
+      for (int i = 0; i < 19; i++) {
+         ClientMessage messageRec = consumer.receive(5000);
+         System.err.println("msg::" + messageRec);
+         Assert.assertNotNull(messageRec);
+         messageRec.acknowledge();
+         consumerSession.commit();
+
+         // The only reason I'm calling cleanup directly is that it would be easy to debug in case of bugs
+         // if you see an issue with cleanup here, enjoy debugging this method
+         queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+      }
+      queue.getPageSubscription().getPagingStore().enableCleanup();
+
+      consumerSession.close();
+
+
+      session.close();
+      sf.close();
+
+
+      server.stop();
+   }
+
+   // The pages are complete, and this is simulating a scenario where the server crashed before deleting the pages.
+   @Test
+   public void testRestartWithComplete() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = createDefaultInVMConfig();
+
+      final AtomicBoolean mainCleanup = new AtomicBoolean(true);
+
+      class InterruptedCursorProvider extends PageCursorProviderImpl {
+
+         public InterruptedCursorProvider(PagingStore pagingStore,
+                                          StorageManager storageManager,
+                                          Executor executor,
+                                          int maxCacheSize) {
+            super(pagingStore, storageManager, executor, maxCacheSize);
+         }
+
+         @Override
+         public void cleanup() {
+            if (mainCleanup.get()) {
+               super.cleanup();
+            }
+            else {
+               try {
+                  pagingStore.unlock();
+               }
+               catch (Throwable ignored) {
+               }
+            }
+         }
+      }
+
+      server = new ActiveMQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+         @Override
+         protected PagingStoreFactoryNIO getPagingStoreFactory() {
+            return new PagingStoreFactoryNIO(this.getStorageManager(), this.getConfiguration().getPagingLocation(), this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), this.getExecutorFactory(), this.getConfiguration().isJournalSyncNonTransactional(), null) {
+               @Override
+               public PageCursorProvider newCursorProvider(PagingStore store, StorageManager storageManager, AddressSettings addressSettings, Executor executor) {
+                  return new InterruptedCursorProvider(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+               }
+            };
+         }
+
+      };
+
+      addServer(server);
+
+      AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes( PagingTest.PAGE_SIZE).setMaxSizeBytes(PagingTest.PAGE_MAX).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      server.start();
+
+      locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(true, true, 0);
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      Queue queue = server.locateQueue(ADDRESS);
+
+      queue.getPageSubscription().getPagingStore().startPaging();
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message;
+
+      for (int i = 0; i < 20; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(new byte[100 * 4]);
+
+         message.putIntProperty(new SimpleString("idi"), i);
+
+         producer.send(message);
+         session.commit();
+         if (i < 19) {
+            queue.getPageSubscription().getPagingStore().forceAnotherPage();
+         }
+
+      }
+
+      Assert.assertEquals(20, queue.getPageSubscription().getPagingStore().getCurrentWritingPage());
+
+      // This will force a scenario where the pages are cleaned up. When restarting we need to check if the current page is complete
+      // if it is complete we must move to another page avoiding races on cleanup
+      // which could happen during a crash / restart
+      long tx = server.getStorageManager().generateID();
+      for (int i = 1; i <= 20; i++) {
+         server.getStorageManager().storePageCompleteTransactional(tx, queue.getID(), new PagePositionImpl(i, 1));
+      }
+
+      server.getStorageManager().commit(tx);
+
+      session.close();
+      sf.close();
+
+      server.stop();
+      mainCleanup.set(false);
+
+      logger.trace("Server restart");
+
+      server.start();
+
+      queue = server.locateQueue(ADDRESS);
+
+      locator = createInVMNonHALocator();
+      sf = createSessionFactory(locator);
+      session = sf.createSession(null, null, false, false, true, false, 0);
+      producer = session.createProducer(PagingTest.ADDRESS);
+
+      for (int i = 0; i < 10; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(new byte[100 * 4]);
+
+         message.putIntProperty(new SimpleString("newid"), i);
+
+         producer.send(message);
+         session.commit();
+
+         if (i == 5) {
+            queue.getPageSubscription().getPagingStore().forceAnotherPage();
+         }
+      }
+
+
+      mainCleanup.set(true);
+
+      queue = server.locateQueue(ADDRESS);
+      queue.getPageSubscription().cleanupEntries(false);
+      queue.getPageSubscription().getPagingStore().getCursorProvider().cleanup();
+
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+      session.start();
+
+      for (int i = 0; i < 10; i++) {
+         message = consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals(i, message.getIntProperty("newid").intValue());
+         message.acknowledge();
+      }
+
+      server.stop();
+
+      //      Thread.sleep(5000);
+
+
+
+   }
+
    @Test
    public void testCommitOnSend() throws Exception {
       clearDataRecreateServerDirs();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3e2adf12/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index d16da9f..498beb4 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -42,6 +42,8 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
+import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
+import org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
@@ -105,7 +107,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
    public void testDoubleStart() throws Exception {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
-      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
+      PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, new FakeStoreFactory(factory), PagingStoreImplTest.destinationTestName, new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE), getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -160,7 +162,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
 
       storeImpl.sync();
 
-      storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, null, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
+      storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName, null, 100, createMockManager(), createStorageManagerMock(), factory, storeFactory, PagingStoreImplTest.destinationTestName, addressSettings, getExecutorFactory().getExecutor(), true);
 
       storeImpl.start();
 
@@ -809,6 +811,14 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
       }
 
       @Override
+      public PageCursorProvider newCursorProvider(PagingStore store,
+                                                  StorageManager storageManager,
+                                                  AddressSettings addressSettings,
+                                                  Executor executor) {
+         return new PageCursorProviderImpl(store, storageManager, executor, addressSettings.getPageCacheMaxSize());
+      }
+
+      @Override
       public void setPagingManager(final PagingManager manager) {
       }
 


[4/4] activemq-artemis git commit: This closes #522

Posted by jb...@apache.org.
This closes #522


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d728fe77
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d728fe77
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d728fe77

Branch: refs/heads/master
Commit: d728fe77182e9d0b628af83e356ceaed9dbb52c7
Parents: 664636d a75bd76
Author: jbertram <jb...@apache.org>
Authored: Wed May 18 12:46:44 2016 -0500
Committer: jbertram <jb...@apache.org>
Committed: Wed May 18 12:46:44 2016 -0500

----------------------------------------------------------------------
 .../core/ChannelBroadcastEndpointFactory.java   |   7 +-
 .../api/core/JGroupsBroadcastEndpoint.java      |   7 +-
 .../api/core/jgroups/JChannelManager.java       |   5 +-
 .../api/core/jgroups/JChannelWrapper.java       |  17 +-
 .../api/core/jgroups/JGroupsReceiver.java       |   7 +-
 .../core/client/impl/ClientConsumerImpl.java    | 119 +++++++--
 .../client/impl/ClientSessionFactoryImpl.java   |  80 +++---
 .../core/client/impl/ClientSessionImpl.java     |  49 ++--
 .../core/client/impl/ServerLocatorImpl.java     |  43 ++--
 .../artemis/core/client/impl/Topology.java      |  62 ++---
 .../artemis/core/cluster/DiscoveryGroup.java    |  13 +-
 .../impl/ActiveMQClientProtocolManager.java     |   8 +-
 .../core/impl/ActiveMQConsumerContext.java      |   7 +
 .../core/impl/ActiveMQSessionContext.java       |   7 +-
 .../core/protocol/core/impl/ChannelImpl.java    |  47 ++--
 .../core/impl/RemotingConnectionImpl.java       |  15 +-
 .../remoting/impl/netty/NettyConnector.java     |  10 +-
 .../protocol/AbstractRemotingConnection.java    |   5 +-
 .../artemis/utils/OrderedExecutorFactory.java   |   5 +-
 .../artemis/utils/SoftValueHashMap.java         |   9 +-
 .../apache/activemq/artemis/utils/XMLUtil.java  |   5 +-
 .../jms/client/ActiveMQMessageConsumer.java     |   8 +
 .../artemis/jms/client/JMSExceptionHelper.java  |  10 +
 .../jms/client/JMSMessageListenerWrapper.java   |   3 +
 .../artemis/core/paging/PagingStoreFactory.java |   5 +
 .../core/paging/cursor/NonExistentPage.java     |  43 ++++
 .../core/paging/cursor/PageCursorProvider.java  |   3 +
 .../core/paging/cursor/PageSubscription.java    |   2 +-
 .../paging/cursor/impl/LivePageCacheImpl.java   |   6 +-
 .../cursor/impl/PageCursorProviderImpl.java     |  91 ++++---
 .../cursor/impl/PageSubscriptionImpl.java       |  53 +++-
 .../activemq/artemis/core/paging/impl/Page.java |   4 +-
 .../paging/impl/PageTransactionInfoImpl.java    |  20 ++
 .../core/paging/impl/PagingStoreFactoryNIO.java |   7 +
 .../core/paging/impl/PagingStoreImpl.java       |  11 +-
 .../impl/journal/JournalStorageManager.java     |   6 +-
 .../impl/journal/LargeServerMessageImpl.java    |  15 +-
 .../postoffice/impl/DuplicateIDCacheImpl.java   |   2 +-
 .../server/impl/RemotingServiceImpl.java        |   2 +
 .../core/server/impl/ActiveMQServerImpl.java    |   8 +-
 .../artemis/core/server/impl/RefsOperation.java |   5 +
 .../core/server/impl/ServerConsumerImpl.java    |   8 +
 .../tests/integration/client/PagingTest.java    | 254 +++++++++++++++++++
 .../core/paging/impl/PagingStoreImplTest.java   |  14 +-
 44 files changed, 828 insertions(+), 279 deletions(-)
----------------------------------------------------------------------



[3/4] activemq-artemis git commit: ARTEMIS-525 Interrupted Thread should throw JMS Exceptions over the JMS layer

Posted by jb...@apache.org.
ARTEMIS-525 Interrupted Thread should throw JMS Exceptions over the JMS layer


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a75bd763
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a75bd763
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a75bd763

Branch: refs/heads/master
Commit: a75bd7630af24b6b48e5cfa23fa5c2a4aacda9cc
Parents: 3e2adf1
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue May 17 19:33:36 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue May 17 20:28:40 2016 -0400

----------------------------------------------------------------------
 .../activemq/artemis/jms/client/ActiveMQMessageConsumer.java   | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a75bd763/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index 8929fa5..53242f1 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -218,6 +218,7 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
                jmsMsg.doBeforeReceive();
             }
             catch (IndexOutOfBoundsException ioob) {
+               ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
                // In case this exception happen you will need to know where it happened.
                // it has been a bug here in the past, and this was used to debug it.
                // nothing better than keep it for future investigations in case it happened again
@@ -240,6 +241,11 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
          return jmsMsg;
       }
       catch (ActiveMQException e) {
+         ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
+         throw JMSExceptionHelper.convertFromActiveMQException(e);
+      }
+      catch (ActiveMQInterruptedException e) {
+         ((ClientSessionInternal)session.getCoreSession()).markRollbackOnly();
          throw JMSExceptionHelper.convertFromActiveMQException(e);
       }
    }