You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:45 UTC

[36/42] activemq-artemis git commit: ARTEMIS-463 Refactoring on Openwire https://issues.apache.org/jira/browse/ARTEMIS-463

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 926aebd..4675dca 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -16,9 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
+import javax.jms.ResourceAllocationException;
 import javax.transaction.xa.Xid;
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -26,52 +25,56 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.paging.PagingStore;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.IDGenerator;
+import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.ProducerAck;
-import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
-import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
-import org.apache.activemq.artemis.core.protocol.openwire.SendingResult;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
-import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback {
-   private AMQServerSession coreSession;
+
+   // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session
+   protected final IDGenerator idGenerator = new SimpleIDGenerator(0);
+
    private ConnectionInfo connInfo;
+   private AMQServerSession coreSession;
    private SessionInfo sessInfo;
    private ActiveMQServer server;
    private OpenWireConnection connection;
 
    private Map<Long, AMQConsumer> consumers = new ConcurrentHashMap<>();
 
-   private Map<Long, AMQProducer> producers = new HashMap<>();
-
    private AtomicBoolean started = new AtomicBoolean(false);
 
    private TransactionId txId = null;
@@ -82,6 +85,11 @@ public class AMQSession implements SessionCallback {
 
    private OpenWireProtocolManager manager;
 
+   // The sessionWireformat used by the session
+   // this object is meant to be used per thread / session
+   // so we make a new one per AMQSession
+   private final OpenWireMessageConverter converter;
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -90,10 +98,18 @@ public class AMQSession implements SessionCallback {
                      OpenWireProtocolManager manager) {
       this.connInfo = connInfo;
       this.sessInfo = sessInfo;
+
       this.server = server;
       this.connection = connection;
       this.scheduledPool = scheduledPool;
       this.manager = manager;
+      OpenWireFormat marshaller = (OpenWireFormat) connection.getMarshaller();
+
+      this.converter = new OpenWireMessageConverter(marshaller.copy());
+   }
+
+   public OpenWireMessageConverter getConverter() {
+      return converter;
    }
 
    public void initialize() {
@@ -106,7 +122,7 @@ public class AMQSession implements SessionCallback {
       // now
 
       try {
-         coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, new AMQServerSessionFactory(), true);
+         coreSession = (AMQServerSession) server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, AMQServerSessionFactory.getInstance(), true);
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1) {
@@ -119,7 +135,9 @@ public class AMQSession implements SessionCallback {
 
    }
 
-   public void createConsumer(ConsumerInfo info, AMQSession amqSession) throws Exception {
+   public List<AMQConsumer> createConsumer(ConsumerInfo info,
+                              AMQSession amqSession,
+                              SlowConsumerDetectionListener slowConsumerDetectionListener) throws Exception {
       //check destination
       ActiveMQDestination dest = info.getDestination();
       ActiveMQDestination[] dests = null;
@@ -129,29 +147,46 @@ public class AMQSession implements SessionCallback {
       else {
          dests = new ActiveMQDestination[]{dest};
       }
-      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
-      for (ActiveMQDestination d : dests) {
-         if (d.isQueue()) {
-            SimpleString queueName = OpenWireUtil.toCoreAddress(d);
+//      Map<ActiveMQDestination, AMQConsumer> consumerMap = new HashMap<>();
+      List<AMQConsumer> consumersList = new java.util.LinkedList<>();
+
+      for (ActiveMQDestination openWireDest : dests) {
+         if (openWireDest.isQueue()) {
+            SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
             getCoreServer().getJMSQueueCreator().create(queueName);
          }
-         AMQConsumer consumer = new AMQConsumer(this, d, info, scheduledPool);
-         consumer.init();
-         consumerMap.put(d, consumer);
+         AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
+
+         consumer.init(slowConsumerDetectionListener, idGenerator.generateID());
+         consumersList.add(consumer);
          consumers.put(consumer.getNativeId(), consumer);
       }
-      connection.addConsumerBrokerExchange(info.getConsumerId(), amqSession, consumerMap);
+
+      return consumersList;
+   }
+
+   public void start() {
 
       coreSession.start();
       started.set(true);
+
    }
 
+   // rename actualDest to destination
    @Override
    public void afterDelivery() throws Exception {
 
    }
 
    @Override
+   public void browserFinished(ServerConsumer consumer) {
+      AMQConsumer theConsumer = ((AMQServerConsumer) consumer).getAmqConsumer();
+      if (theConsumer != null) {
+         theConsumer.browseFinished();
+      }
+   }
+
+   @Override
    public boolean isWritable(ReadyListener callback) {
       return connection.isWritable(callback);
    }
@@ -197,49 +232,26 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public boolean hasCredits(ServerConsumer consumerID) {
-      AMQConsumer amqConsumer = consumers.get(consumerID.getID());
-      return amqConsumer.hasCredits();
-   }
-
-   @Override
-   public void disconnect(ServerConsumer consumerId, String queueName) {
-      // TODO Auto-generated method stub
-
-   }
-
-   public AMQServerSession getCoreSession() {
-      return this.coreSession;
-   }
-
-   public ActiveMQServer getCoreServer() {
-      return this.server;
-   }
 
-   public void removeConsumer(long consumerId) throws Exception {
-      boolean failed = !(this.txId != null || this.isTx);
+      AMQConsumer amqConsumer;
 
-      coreSession.amqCloseConsumer(consumerId, failed);
-      consumers.remove(consumerId);
-   }
+      amqConsumer = consumers.get(consumerID.getID());
 
-   public void createProducer(ProducerInfo info) throws Exception {
-      AMQProducer producer = new AMQProducer(this, info);
-      producer.init();
-      producers.put(info.getProducerId().getValue(), producer);
+      if (amqConsumer != null) {
+         return amqConsumer.hasCredits();
+      }
+      return false;
    }
 
-   public void removeProducer(ProducerInfo info) {
-      removeProducer(info.getProducerId());
-   }
+   @Override
+   public void disconnect(ServerConsumer consumerId, String queueName) {
+      // TODO Auto-generated method stub
 
-   public void removeProducer(ProducerId id) {
-      producers.remove(id.getValue());
    }
 
-   public SendingResult send(AMQProducerBrokerExchange producerExchange,
-                             Message messageSend,
-                             boolean sendProducerAck) throws Exception {
-      SendingResult result = new SendingResult();
+   public void send(final ProducerInfo producerInfo,
+                    final Message messageSend,
+                    boolean sendProducerAck) throws Exception {
       TransactionId tid = messageSend.getTransactionId();
       if (tid != null) {
          resetSessionTx(tid);
@@ -251,41 +263,132 @@ public class AMQSession implements SessionCallback {
       ActiveMQDestination[] actualDestinations = null;
       if (destination.isComposite()) {
          actualDestinations = destination.getCompositeDestinations();
+         messageSend.setOriginalDestination(destination);
       }
       else {
          actualDestinations = new ActiveMQDestination[]{destination};
       }
 
-      for (ActiveMQDestination dest : actualDestinations) {
-         ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
+      ServerMessage originalCoreMsg = getConverter().inbound(messageSend);
 
-         /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
-         * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
-         * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this we check to see if the
-         * message comes from failover connection.  If so we add a DUPLICATE_ID to handle duplicates after a resend. */
-         if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
-            coreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+      /* ActiveMQ failover transport will attempt to reconnect after connection failure.  Any sent messages that did
+      * not receive acks will be resent.  (ActiveMQ broker handles this by returning a last sequence id received to
+      * the client).  To handle this in Artemis we use a duplicate ID cache.  To do this we check to see if the
+      * message comes from failover connection.  If so we add a DUPLICATE_ID to handle duplicates after a resend. */
+      if (connection.getContext().isFaultTolerant() && !messageSend.getProperties().containsKey(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID)) {
+         originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
+      }
+
+      Runnable runnable;
+
+      if (sendProducerAck) {
+         runnable = new Runnable() {
+            public void run() {
+               try {
+                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                  connection.dispatchSync(ack);
+               }
+               catch (Exception e) {
+                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
+                  connection.sendException(e);
+               }
+
+            }
+         };
+      }
+      else {
+         final Connection transportConnection = connection.getTransportConnection();
+
+         //         new Exception("Setting to false").printStackTrace();
+
+         if (transportConnection == null) {
+            // I don't think this could happen, but just in case, avoiding races
+            runnable = null;
+         }
+         else {
+            runnable = new Runnable() {
+               public void run() {
+                  transportConnection.setAutoRead(true);
+               }
+            };
          }
-         OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
-         SimpleString address = OpenWireUtil.toCoreAddress(dest);
-         coreMsg.setAddress(address);
+      }
+
+      internalSend(actualDestinations, originalCoreMsg, runnable);
+   }
+
+   private void internalSend(ActiveMQDestination[] actualDestinations,
+                             ServerMessage originalCoreMsg,
+                             final Runnable onComplete) throws Exception {
+
+      Runnable runToUse;
+
+      if (actualDestinations.length <= 1 || onComplete == null) {
+         // if onComplete is null, this will be null ;)
+         runToUse = onComplete;
+      }
+      else {
+         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
+         runToUse = new Runnable() {
+            @Override
+            public void run() {
+               if (count.decrementAndGet() == 0) {
+                  onComplete.run();
+               }
+            }
+         };
+      }
+
+      SimpleString[] addresses = new SimpleString[actualDestinations.length];
+      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
+
+      // We fillup addresses, pagingStores and we will throw failure if that's the case
+      for (int i = 0; i < actualDestinations.length; i++) {
+         ActiveMQDestination dest = actualDestinations[i];
+         addresses[i] = OpenWireUtil.toCoreAddress(dest);
+         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
+         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
+            throw new ResourceAllocationException("Queue is full");
+         }
+      }
+
+      for (int i = 0; i < actualDestinations.length; i++) {
+
+         ServerMessage coreMsg = originalCoreMsg.copy();
+
+         coreMsg.setAddress(addresses[i]);
+
+         PagingStore store = pagingStores[i];
 
-         PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
          if (store.isFull()) {
-            result.setBlockNextSend(true);
-            result.setBlockPagingStore(store);
-            result.setBlockingAddress(address);
-            //now we hold this message send until the store has space.
-            //we do this by put it in a scheduled task
-            ScheduledExecutorService scheduler = server.getScheduledPool();
-            Runnable sendRetryTask = new SendRetryTask(coreMsg, producerExchange, sendProducerAck, messageSend.getSize(), messageSend.getCommandId());
-            scheduler.schedule(sendRetryTask, 10, TimeUnit.MILLISECONDS);
+            connection.getTransportConnection().setAutoRead(false);
          }
-         else {
-            coreSession.send(coreMsg, false);
+
+         getCoreSession().send(coreMsg, false);
+
+         if (runToUse != null) {
+            // if the timeout is >0, it will wait this much milliseconds
+            // before running the the runToUse
+            // this will eventually unblock blocked destinations
+            // playing flow control
+            store.checkMemory(runToUse);
          }
       }
-      return result;
+   }
+
+   public AMQServerSession getCoreSession() {
+      return this.coreSession;
+   }
+
+   public ActiveMQServer getCoreServer() {
+      return this.server;
+   }
+
+   public void removeConsumer(long consumerId) throws Exception {
+      boolean failed = !(this.txId != null || this.isTx);
+
+      coreSession.amqCloseConsumer(consumerId, failed);
+      consumers.remove(consumerId);
    }
 
    public WireFormat getMarshaller() {
@@ -449,87 +552,17 @@ public class AMQSession implements SessionCallback {
       return consumers.get(coreConsumerId);
    }
 
-   private class SendRetryTask implements Runnable {
-
-      private ServerMessage coreMsg;
-      private AMQProducerBrokerExchange producerExchange;
-      private boolean sendProducerAck;
-      private int msgSize;
-      private int commandId;
-
-      public SendRetryTask(ServerMessage coreMsg,
-                           AMQProducerBrokerExchange producerExchange,
-                           boolean sendProducerAck,
-                           int msgSize,
-                           int commandId) {
-         this.coreMsg = coreMsg;
-         this.producerExchange = producerExchange;
-         this.sendProducerAck = sendProducerAck;
-         this.msgSize = msgSize;
-         this.commandId = commandId;
-      }
-
-      @Override
-      public void run() {
-         synchronized (AMQSession.this) {
-            try {
-               // check pageStore
-               SimpleString address = coreMsg.getAddress();
-               PagingStoreImpl store = (PagingStoreImpl) server.getPagingManager().getPageStore(address);
-               if (store.isFull()) {
-                  // if store is still full, schedule another
-                  server.getScheduledPool().schedule(this, 10, TimeUnit.MILLISECONDS);
-               }
-               else {
-                  // now send the message again.
-                  coreSession.send(coreMsg, false);
-
-                  if (sendProducerAck) {
-                     ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-                     ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), msgSize);
-                     connection.dispatchAsync(ack);
-                  }
-                  else {
-                     Response response = new Response();
-                     response.setCorrelationId(commandId);
-                     connection.dispatchAsync(response);
-                  }
-               }
-            }
-            catch (Exception e) {
-               ExceptionResponse response = new ExceptionResponse(e);
-               response.setCorrelationId(commandId);
-               connection.dispatchAsync(response);
-            }
+   public void updateConsumerPrefetchSize(ConsumerId consumerId, int prefetch) {
+      Iterator<AMQConsumer> iterator = consumers.values().iterator();
+      while (iterator.hasNext()) {
+         AMQConsumer consumer = iterator.next();
+         if (consumer.getId().equals(consumerId)) {
+            consumer.setPrefetchSize(prefetch);
          }
-
       }
    }
 
-   public void blockingWaitForSpace(AMQProducerBrokerExchange producerExchange,
-                                    SendingResult result) throws IOException {
-      long start = System.currentTimeMillis();
-      long nextWarn = start;
-      producerExchange.blockingOnFlowControl(true);
-
-      AMQConnectionContext context = producerExchange.getConnectionContext();
-      PagingStoreImpl store = result.getBlockPagingStore();
-
-      //Destination.DEFAULT_BLOCKED_PRODUCER_WARNING_INTERVAL
-      long blockedProducerWarningInterval = 30000;
-      ProducerId producerId = producerExchange.getProducerState().getInfo().getProducerId();
-
-      while (store.isFull()) {
-         if (context.getStopping().get()) {
-            throw new IOException("Connection closed, send aborted.");
-         }
-
-         long now = System.currentTimeMillis();
-         if (now >= nextWarn) {
-            ActiveMQServerLogger.LOGGER.memoryLimitReached(producerId.toString(), result.getBlockingAddress().toString(), ((now - start) / 1000));
-            nextWarn = now + blockedProducerWarningInterval;
-         }
-      }
-      producerExchange.blockingOnFlowControl(false);
+   public OpenWireConnection getConnection() {
+      return connection;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
deleted file mode 100644
index 0e192db..0000000
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/BrowserListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.protocol.openwire.amq;
-
-interface BrowserListener {
-
-   void browseFinished();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
new file mode 100644
index 0000000..1c64676
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.protocol.openwire.util;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.XATransactionId;
+import org.apache.activemq.util.ByteSequence;
+
+public class OpenWireUtil {
+
+   public static ActiveMQBuffer toActiveMQBuffer(ByteSequence bytes) {
+      ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bytes.length);
+
+      buffer.writeBytes(bytes.data, bytes.offset, bytes.length);
+      return buffer;
+   }
+
+   public static SimpleString toCoreAddress(ActiveMQDestination dest) {
+      if (dest.isQueue()) {
+         return new SimpleString("jms.queue." + dest.getPhysicalName());
+      }
+      else {
+         return new SimpleString("jms.topic." + dest.getPhysicalName());
+      }
+   }
+
+   /**
+    * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
+    * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
+    * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
+    * consumer
+    */
+   public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
+      String address = message.getAddress().toString();
+      String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
+      if (actualDestination.isQueue()) {
+         return new ActiveMQQueue(strippedAddress);
+      }
+      else {
+         return new ActiveMQTopic(strippedAddress);
+      }
+   }
+
+   /*
+    *This util converts amq wildcards to compatible core wildcards
+    *The conversion is like this:
+    *AMQ * wildcard --> Core * wildcard (no conversion)
+    *AMQ > wildcard --> Core # wildcard
+    */
+   public static String convertWildcard(String physicalName) {
+      return physicalName.replaceAll("(\\.>)+", ".#");
+   }
+
+   public static XidImpl toXID(TransactionId xaXid) {
+      return toXID((XATransactionId)xaXid);
+   }
+
+   public static XidImpl toXID(XATransactionId xaXid) {
+      return new XidImpl(xaXid.getBranchQualifier(), xaXid.getFormatId(), xaXid.getGlobalTransactionId());
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index e94e0bc..a6cbe71 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -113,6 +113,11 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
+   public void browserFinished(ServerConsumer consumer) {
+
+   }
+
+   @Override
    public int sendMessage(ServerMessage serverMessage, final ServerConsumer consumer, int deliveryCount) {
       LargeServerMessageImpl largeMessage = null;
       ServerMessage newServerMessage = serverMessage;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index d52f53f..4a24b57 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -947,4 +947,8 @@ public interface Configuration {
    StoreConfiguration getStoreConfiguration();
 
    Configuration setStoreConfiguration(StoreConfiguration storeConfiguration);
+
+   /** It will return all the connectors in a toString manner for debug purposes. */
+   String debugConnectors();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 7784a01..1a9690f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -21,7 +21,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
 import java.io.Serializable;
+import java.io.StringWriter;
 import java.lang.reflect.Array;
 import java.net.URI;
 import java.security.AccessController;
@@ -1299,6 +1301,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
    public TransportConfiguration[] getTransportConfigurations(final List<String> connectorNames) {
       TransportConfiguration[] tcConfigs = (TransportConfiguration[]) Array.newInstance(TransportConfiguration.class, connectorNames.size());
       int count = 0;
+      System.out.println(debugConnectors());
+
       for (String connectorName : connectorNames) {
          TransportConfiguration connector = getConnectorConfigurations().get(connectorName);
 
@@ -1314,6 +1318,21 @@ public class ConfigurationImpl implements Configuration, Serializable {
       return tcConfigs;
    }
 
+   public String debugConnectors() {
+      StringWriter stringWriter = new StringWriter();
+      PrintWriter writer = new PrintWriter(stringWriter);
+
+
+      for (Map.Entry<String, TransportConfiguration> connector : getConnectorConfigurations().entrySet()) {
+         writer.println("Connector::" + connector.getKey() + " value = " + connector.getValue());
+      }
+
+      writer.close();
+
+      return stringWriter.toString();
+
+   }
+
    @Override
    public boolean isResolveProtocols() {
       return resolveProtocols;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index e831966..566b91a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -126,6 +126,8 @@ public interface PagingStore extends ActiveMQComponent {
 
    boolean checkMemory(Runnable runnable);
 
+   boolean isFull();
+
    /**
     * Write lock the PagingStore.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index 0b74fd7..c05a288 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -98,6 +98,10 @@ public final class CoreSessionCallback implements SessionCallback {
       channel.send(packet);
    }
 
+   @Override
+   public void browserFinished(ServerConsumer consumer) {
+
+   }
 
    @Override
    public void afterDelivery() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
index 70d6289..db61f89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java
@@ -138,6 +138,12 @@ public class InVMConnection implements Connection {
    }
 
    @Override
+   public void setAutoRead(boolean autoRead) {
+      // nothing to be done on the INVM.
+      // maybe we could eventually implement something, but not needed now
+   }
+
+   @Override
    public ActiveMQBuffer createTransportBuffer(final int size) {
       return ActiveMQBuffers.dynamicBuffer(size);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index efbc1ea..34cc8cf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -169,6 +169,8 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    private final long connectionsAllowed;
 
+   private Map<String, Object> extraConfigs;
+
    public NettyAcceptor(final String name,
                         final ClusterConnection clusterConnection,
                         final Map<String, Object> configuration,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
index d2cde4b..795bbb5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java
@@ -146,7 +146,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
       this.flushExecutor = flushExecutor;
 
       ActiveMQServerLogger.LOGGER.addingProtocolSupport(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.getModuleName());
-//      this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
+      //      this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory.createProtocolManager(server, coreProtocolManagerFactory.filterInterceptors(incomingInterceptors), coreProtocolManagerFactory.filterInterceptors(outgoingInterceptors)));
       this.protocolMap.put(coreProtocolManagerFactory.getProtocols()[0], coreProtocolManagerFactory);
 
       if (config.isResolveProtocols()) {
@@ -206,8 +206,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
          @Override
          public ThreadFactory run() {
             return new ActiveMQThreadFactory("ActiveMQ-remoting-threads-" + server.toString() +
-                                                                        "-" +
-                                                                        System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
+                                                "-" +
+                                                System.identityHashCode(this), false, Thread.currentThread().getContextClassLoader());
          }
       });
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e3c1b2a..64633bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    Queue locateQueue(SimpleString queueName);
 
+   BindingQueryResult bindingQuery(SimpleString address) throws Exception;
+
+   QueueQueryResult queueQuery(SimpleString name) throws Exception;
+
    void destroyQueue(SimpleString queueName) throws Exception;
 
    void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 6045e2c..d75efdd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -25,6 +25,12 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
  */
 public interface ServerConsumer extends Consumer {
 
+   void setlowConsumerDetection(SlowConsumerDetectionListener listener);
+
+   SlowConsumerDetectionListener getSlowConsumerDetecion();
+
+   void fireSlowConsumer();
+
    /**
     * @param protocolContext
     * @see #getProtocolContext()

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
new file mode 100644
index 0000000..0c60f25
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/SlowConsumerDetectionListener.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.server;
+
+public interface SlowConsumerDetectionListener {
+   void onSlowConsumer(ServerConsumer consumer);
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
index ef384e0..e3a583f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/embedded/EmbeddedActiveMQ.java
@@ -69,6 +69,11 @@ public class EmbeddedActiveMQ {
     * @return
     */
    public boolean waitClusterForming(long timeWait, TimeUnit unit, int iterations, int servers) throws Exception {
+      if (activeMQServer.getClusterManager().getClusterConnections() == null ||
+         activeMQServer.getClusterManager().getClusterConnections().size() == 0) {
+         return servers == 0;
+      }
+
       for (int i = 0; i < iterations; i++) {
          for (ClusterConnection connection : activeMQServer.getClusterManager().getClusterConnections()) {
             if (connection.getTopology().getMembers().size() == servers) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 7554127..13a1283 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Bindable;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueCreator;
 import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.ServerSessionFactory;
@@ -545,6 +550,72 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
+      if (address == null) {
+         throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
+      }
+
+      boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
+
+      List<SimpleString> names = new ArrayList<>();
+
+      // make an exception for the management address (see HORNETQ-29)
+      ManagementService managementService = getManagementService();
+      if (managementService != null) {
+         if (address.equals(managementService.getManagementAddress())) {
+            return new BindingQueryResult(true, names, autoCreateJmsQueues);
+         }
+      }
+
+      Bindings bindings = getPostOffice().getMatchingBindings(address);
+
+      for (Binding binding : bindings.getBindings()) {
+         if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
+            names.add(binding.getUniqueName());
+         }
+      }
+
+      return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+   }
+
+   @Override
+   public QueueQueryResult queueQuery(SimpleString name) {
+      if (name == null) {
+         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+      }
+
+      boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
+
+      QueueQueryResult response;
+
+      Binding binding = getPostOffice().getBinding(name);
+
+      SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
+
+      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
+         Queue queue = (Queue) binding.getBindable();
+
+         Filter filter = queue.getFilter();
+
+         SimpleString filterString = filter == null ? null : filter.getFilterString();
+
+         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
+      }
+      // make an exception for the management address (see HORNETQ-29)
+      else if (name.equals(managementAddress)) {
+         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
+      }
+      else if (autoCreateJmsQueues) {
+         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
+      }
+      else {
+         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
+      }
+
+      return response;
+   }
+
+   @Override
    public void threadDump() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8bf5d08..86ca36c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2930,6 +2930,8 @@ public class QueueImpl implements Queue {
                      }
                   }
 
+                  serverConsumer.fireSlowConsumer();
+
                   if (connection != null) {
                      ActiveMQServerLogger.LOGGER.slowConsumerDetected(serverConsumer.getSessionID(), serverConsumer.getID(), getName().toString(), connection.getRemoteAddress(), threshold, consumerRate);
                      if (policy.equals(SlowConsumerPolicy.KILL)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 422d324..14f22ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -90,6 +91,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
    private final ActiveMQServer server;
 
+   private SlowConsumerDetectionListener slowConsumerListener;
+
    /**
     * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
     * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
@@ -228,6 +231,23 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    // ----------------------------------------------------------------------
 
    @Override
+   public void setlowConsumerDetection(SlowConsumerDetectionListener listener) {
+      this.slowConsumerListener = listener;
+   }
+
+   @Override
+   public SlowConsumerDetectionListener getSlowConsumerDetecion() {
+      return slowConsumerListener;
+   }
+
+   @Override
+   public void fireSlowConsumer() {
+      if (slowConsumerListener != null) {
+         slowConsumerListener.onSlowConsumer(this);
+      }
+   }
+
+   @Override
    public Object getProtocolContext() {
       return protocolContext;
    }
@@ -546,12 +566,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
                else {
                   refs.add(ref);
-                  if (!failed) {
-                     // We don't decrement delivery count if the client failed, since there's a possibility that refs
-                     // were actually delivered but we just didn't get any acks for them
-                     // before failure
-                     ref.decrementDeliveryCount();
-                  }
+                  updateDeliveryCountForCanceledRef(ref, failed);
                }
 
                if (isTrace) {
@@ -566,6 +581,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       return refs;
    }
 
+   protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+      if (!failed) {
+         // We don't decrement delivery count if the client failed, since there's a possibility that refs
+         // were actually delivered but we just didn't get any acks for them
+         // before failure
+         ref.decrementDeliveryCount();
+      }
+   }
+
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {
@@ -1191,6 +1215,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                ref = null;
                synchronized (messageQueue) {
                   if (!iterator.hasNext()) {
+                     callback.browserFinished(ServerConsumerImpl.this);
                      break;
                   }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d628bde..77705fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
-      if (name == null) {
-         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
-      }
-
-      boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
-
-      QueueQueryResult response;
-
-      Binding binding = postOffice.getBinding(name);
-
-      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
-         Queue queue = (Queue) binding.getBindable();
-
-         Filter filter = queue.getFilter();
-
-         SimpleString filterString = filter == null ? null : filter.getFilterString();
-
-         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
-      }
-      // make an exception for the management address (see HORNETQ-29)
-      else if (name.equals(managementAddress)) {
-         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
-      }
-      else if (autoCreateJmsQueues) {
-         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
-      }
-      else {
-         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
-      }
-
-      return response;
+      return server.queueQuery(name);
    }
 
    @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
-      if (address == null) {
-         throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
-      }
-
-      boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
-
-      List<SimpleString> names = new ArrayList<>();
-
-      // make an exception for the management address (see HORNETQ-29)
-      if (address.equals(managementAddress)) {
-         return new BindingQueryResult(true, names, autoCreateJmsQueues);
-      }
-
-      Bindings bindings = postOffice.getMatchingBindings(address);
-
-      for (Binding binding : bindings.getBindings()) {
-         if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
-            names.add(binding.getUniqueName());
-         }
-      }
-
-      return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+      return server.bindingQuery(address);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 3309fab..4b53ec6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -70,6 +70,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final SlowConsumerPolicy DEFAULT_SLOW_CONSUMER_POLICY = SlowConsumerPolicy.NOTIFY;
 
+   public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -114,6 +116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
 
+   //from amq5
+   //make it transient
+   private transient Integer queuePrefetch = null;
+
    public AddressSettings(AddressSettings other) {
       this.addressFullMessagePolicy = other.addressFullMessagePolicy;
       this.maxSizeBytes = other.maxSizeBytes;
@@ -137,6 +143,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.autoCreateJmsQueues = other.autoCreateJmsQueues;
       this.autoDeleteJmsQueues = other.autoDeleteJmsQueues;
       this.managementBrowsePageSize = other.managementBrowsePageSize;
+      this.queuePrefetch = other.queuePrefetch;
    }
 
    public AddressSettings() {
@@ -333,6 +340,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public int getQueuePrefetch() {
+      return queuePrefetch != null ? queuePrefetch : AddressSettings.DEFAULT_QUEUE_PREFETCH;
+   }
+
+   public AddressSettings setQueuePrefetch(int queuePrefetch) {
+      this.queuePrefetch = queuePrefetch;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -403,6 +419,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (managementBrowsePageSize == null) {
          managementBrowsePageSize = merged.managementBrowsePageSize;
       }
+      if (queuePrefetch == null) {
+         queuePrefetch = merged.queuePrefetch;
+      }
    }
 
    @Override
@@ -569,6 +588,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((autoCreateJmsQueues == null) ? 0 : autoCreateJmsQueues.hashCode());
       result = prime * result + ((autoDeleteJmsQueues == null) ? 0 : autoDeleteJmsQueues.hashCode());
       result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
+      result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
       return result;
    }
 
@@ -718,6 +738,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       }
       else if (!managementBrowsePageSize.equals(other.managementBrowsePageSize))
          return false;
+      if (queuePrefetch == null) {
+         if (other.queuePrefetch != null)
+            return false;
+      }
+      else if (!queuePrefetch.equals(other.queuePrefetch))
+         return false;
       return true;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 4b27bc4..a9eb0f2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -50,4 +50,7 @@ public interface SessionCallback {
    void disconnect(ServerConsumer consumerId, String queueName);
 
    boolean isWritable(ReadyListener callback);
+
+   /** Some protocols (Openwire) needs a special message with the browser is finished. */
+   void browserFinished(ServerConsumer consumer);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
index 846d31b..232d3ae 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ThreadLeakCheckRule.java
@@ -17,7 +17,9 @@
 
 package org.apache.activemq.artemis.tests.util;
 
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.Assert;
 import org.junit.rules.ExternalResource;
@@ -26,6 +28,7 @@ import org.junit.rules.ExternalResource;
  * This is useful to make sure you won't have leaking threads between tests
  */
 public class ThreadLeakCheckRule extends ExternalResource {
+   private static Set<String> extraThreads = new HashSet<String>();
 
    boolean enabled = true;
 
@@ -94,6 +97,11 @@ public class ThreadLeakCheckRule extends ExternalResource {
 
    }
 
+   public static void addExtraThreads(String... threads) {
+      for (String th : threads) {
+         extraThreads.add(th);
+      }
+   }
 
    private boolean checkThread() {
       boolean failedThread = false;
@@ -183,6 +191,9 @@ public class ThreadLeakCheckRule extends ExternalResource {
          // Static workers used by MQTT client.
          return true;
       }
+      else if (extraThreads.contains(threadName)) {
+         return true;
+      }
       else {
          for (StackTraceElement element : thread.getStackTrace()) {
             if (element.getClassName().contains("org.jboss.byteman.agent.TransformListener")) {
@@ -194,4 +205,7 @@ public class ThreadLeakCheckRule extends ExternalResource {
    }
 
 
+   public static void clearExtraThreads() {
+      extraThreads.clear();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
index c57845d..8a64a85 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
@@ -116,7 +116,7 @@ public class JmsRollbackRedeliveryTest {
             Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createQueue(destinationName);
             MessageConsumer consumer = session.createConsumer(destination);
-            TextMessage msg = (TextMessage) consumer.receive(5000);
+            TextMessage msg = (TextMessage) consumer.receive(6000000);
             if (msg != null) {
                if (rolledback.put(msg.getText(), Boolean.TRUE) != null) {
                   LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
index 0b62b31..48c36cf 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
@@ -54,12 +54,12 @@ public class SoWriteTimeoutClientTest extends OpenwireArtemisBaseTest {
    @BeforeClass
    public static void beforeTest() throws Exception {
       //this thread keeps alive in original test too. Exclude it.
-      ThreadLeakCheckRule.addKownThread("WriteTimeoutFilter-Timeout");
+      ThreadLeakCheckRule.addExtraThreads("WriteTimeoutFilter-Timeout-1");
    }
 
    @AfterClass
    public static void afterTest() throws Exception {
-      ThreadLeakCheckRule.removeKownThread("WriteTimeoutFilter-Timeout");
+      ThreadLeakCheckRule.clearExtraThreads();
    }
 
    @Before

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
index 40cbccb..e44a490 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -38,7 +39,10 @@ import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.jboss.byteman.contrib.bmunit.BMRule;
@@ -83,8 +87,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
          targetLocation = "ENTRY",
          action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
          name = "stop broker before commit",
-         targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-         targetMethod = "processCommitTransactionOnePhase",
+         targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+         targetMethod = "commit",
          targetLocation = "ENTRY",
          action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()"),})
    public void testFailoverConsumerDups() throws Exception {
@@ -177,10 +181,10 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
 
          @BMRule(
             name = "stop broker before commit",
-            targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-            targetMethod = "processCommitTransactionOnePhase",
+            targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+            targetMethod = "commit",
             targetLocation = "ENTRY",
-            action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return null")})
+            action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction();return")})
    public void TestFailoverConsumerOutstandingSendTxIncomplete() throws Exception {
       doTestFailoverConsumerOutstandingSendTx(false);
    }
@@ -194,8 +198,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
          targetLocation = "ENTRY",
          action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.holdResponse($0)"), @BMRule(
          name = "stop broker after commit",
-         targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-         targetMethod = "processCommitTransactionOnePhase",
+         targetClass = "org.apache.activemq.artemis.core.server.impl.ServerSessionImpl",
+         targetMethod = "commit",
          targetLocation = "AT EXIT",
          action = "org.apache.activemq.transport.failover.FailoverConsumerOutstandingCommitTest.stopServerInTransaction()")})
    public void TestFailoverConsumerOutstandingSendTxComplete() throws Exception {
@@ -232,13 +236,11 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
       testConsumer.setMessageListener(new MessageListener() {
 
          public void onMessage(Message message) {
-            LOG.info("consume one: " + message);
+            LOG.info("consume one and commit: " + message);
             assertNotNull("got message", message);
             receivedMessages.add((TextMessage) message);
             try {
-               LOG.info("send one");
                produceMessage(consumerSession, signalDestination, 1);
-               LOG.info("commit session");
                consumerSession.commit();
             }
             catch (JMSException e) {
@@ -270,8 +272,8 @@ public class FailoverConsumerOutstandingCommitTest extends OpenwireArtemisBaseTe
 
       // will be stopped by the plugin
       brokerStopLatch.await();
-      doByteman.set(false);
       server.stop();
+      doByteman.set(false);
       server = createBroker();
       server.start();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index e704274..8403ee3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -519,31 +519,31 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
    }
 
-//   @Test
-//   @BMRules(
-//           rules = {
-//                   @BMRule(
-//                           name = "set no return response and stop the broker",
-//                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
-//                           targetMethod = "processMessageAck",
-//                           targetLocation = "ENTRY",
-//                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
-//           }
-//   )
-//   public void testFailoverConsumerAckLost() throws Exception {
-//      LOG.info(this + " running test testFailoverConsumerAckLost");
-//      // as failure depends on hash order of state tracker recovery, do a few times
-//      for (int i = 0; i < 3; i++) {
-//         try {
-//            LOG.info("Iteration: " + i);
-//            doTestFailoverConsumerAckLost(i);
-//         }
-//         finally {
-//            stopBroker();
-//         }
-//      }
-//   }
-//
+   @Test
+   @BMRules(
+           rules = {
+                   @BMRule(
+                           name = "set no return response and stop the broker",
+                           targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+                           targetMethod = "processMessageAck",
+                           targetLocation = "ENTRY",
+                           action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
+           }
+   )
+   public void testFailoverConsumerAckLost() throws Exception {
+      LOG.info(this + " running test testFailoverConsumerAckLost");
+      // as failure depends on hash order of state tracker recovery, do a few times
+      for (int i = 0; i < 3; i++) {
+         try {
+            LOG.info("Iteration: " + i);
+            doTestFailoverConsumerAckLost(i);
+         }
+         finally {
+            stopBroker();
+         }
+      }
+   }
+
    @SuppressWarnings("unchecked")
    public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
       broker = createBroker();
@@ -567,12 +567,12 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          connection = cf.createConnection();
          connection.start();
          connections.add(connection);
-         final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+         final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
          connection = cf.createConnection();
          connection.start();
          connections.add(connection);
-         final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+         final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
          final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
          final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
@@ -583,7 +583,7 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
          final Vector<Message> receivedMessages = new Vector<>();
          final CountDownLatch commitDoneLatch = new CountDownLatch(1);
          final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
-         Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
+         new Thread() {
             public void run() {
                LOG.info("doing async commit after consume...");
                try {
@@ -630,16 +630,10 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
                   e.printStackTrace();
                }
             }
-         };
-         t.start();
+         }.start();
 
          // will be stopped by the plugin
          brokerStopLatch.await(60, TimeUnit.SECONDS);
-         t.join(30000);
-         if (t.isAlive()) {
-            t.interrupt();
-            Assert.fail("Thread " + t.getName() + " is still alive");
-         }
          broker = createBroker();
          broker.start();
          doByteman.set(false);
@@ -1062,10 +1056,8 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
             new Thread() {
                public void run() {
                   try {
-                     if (broker != null) {
-                        broker.stop();
-                        broker = null;
-                     }
+                     broker.stop();
+                     broker = null;
                      LOG.info("broker stopped.");
                   }
                   catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index a3bae65..54ae6c8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -50,10 +50,10 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.core.Packet;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.ServerSessionFactory;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueFactoryImpl;
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
@@ -507,7 +507,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
        */
       @Override
-      public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
+      public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) {
          inCall.countDown();
          try {
             callbackSemaphore.acquire();
@@ -518,7 +518,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
          }
 
          try {
-            return targetCallback.sendMessage(ref, message, consumer, deliveryCount);
+            return targetCallback.sendMessage(message, consumer, deliveryCount);
          }
          finally {
             callbackSemaphore.release();
@@ -530,8 +530,8 @@ public class HangConsumerTest extends ActiveMQTestBase {
        * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
        */
       @Override
-      public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
-         return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount);
+      public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
+         return targetCallback.sendLargeMessage(message, consumer, bodySize, deliveryCount);
       }
 
       /* (non-Javadoc)
@@ -581,6 +581,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         String defaultAddress,
                                                         SessionCallback callback,
                                                         OperationContext context,
+                                                        ServerSessionFactory sessionFactory,
                                                         boolean autoCreateQueue) throws Exception {
          return new ServerSessionImpl(name, username, password, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
index 14cfee0..a1a5e38 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicSecurityTest.java
@@ -26,7 +26,6 @@ import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.command.ActiveMQQueue;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -119,7 +118,7 @@ public class BasicSecurityTest extends BasicOpenWireTest {
    }
 
    @Test
-      public void testSendnReceiveAuthorization() throws Exception {
+   public void testSendnReceiveAuthorization() throws Exception {
       Connection sendingConn = null;
       Connection receivingConn = null;
 
@@ -153,18 +152,16 @@ public class BasicSecurityTest extends BasicOpenWireTest {
          producer = sendingSession.createProducer(dest);
          producer.send(message);
 
-         MessageConsumer consumer;
+         MessageConsumer consumer = null;
          try {
             consumer = sendingSession.createConsumer(dest);
-            Assert.fail("exception expected");
          }
          catch (JMSSecurityException e) {
-            e.printStackTrace();
             //expected
          }
 
          consumer = receivingSession.createConsumer(dest);
-         TextMessage received = (TextMessage) consumer.receive(5000);
+         TextMessage received = (TextMessage) consumer.receive();
 
          assertNotNull(received);
          assertEquals("Hello World", received.getText());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6ddf486f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
index 69d9784..825b8b5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireUtilTest.java
@@ -18,7 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireUtil;
 import org.junit.Test;
 
 public class OpenWireUtilTest {