You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/11/29 16:52:13 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1529 Fixing Ref count over asynchronous ack

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4584ac697 -> a822af471


ARTEMIS-1529 Fixing Ref count over asynchronous ack


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8b7282d8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8b7282d8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8b7282d8

Branch: refs/heads/master
Commit: 8b7282d849fca896f6b9794a5ddfc251db947120
Parents: dbb3aad
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Nov 28 21:08:05 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500

----------------------------------------------------------------------
 .../artemis/utils/ReferenceCounter.java         | 15 ++++
 .../artemis/utils/ReferenceCounterUtil.java     | 54 ++++++++++---
 .../artemis/utils/ReferenceCounterTest.java     | 15 +++-
 .../amqp/broker/AMQPSessionCallback.java        | 33 ++++++--
 .../activemq/artemis/core/server/Queue.java     |  4 +
 .../artemis/core/server/impl/QueueImpl.java     | 20 +++++
 .../core/server/impl/QueueManagerImpl.java      | 80 ++++++++------------
 .../core/server/impl/ServerConsumerImpl.java    |  2 +
 .../server/impl/TransientQueueManagerImpl.java  | 41 ++++------
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 ++
 .../integration/amqp/TopicDurableTests.java     | 39 ++++------
 .../tests/integration/client/ConsumerTest.java  | 18 ++++-
 .../unit/core/postoffice/impl/FakeQueue.java    |  6 ++
 13 files changed, 213 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
index 2f46fb1..423b6b4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
@@ -21,4 +21,19 @@ public interface ReferenceCounter {
    int increment();
 
    int decrement();
+
+   int getCount();
+
+
+   void setTask(Runnable task);
+
+   Runnable getTask();
+
+   /**
+    * Some asynchronous operations (like ack) may delay certain conditions.
+    * After met, during afterCompletion we may need to recheck certain values
+    * to make sure we won't get into a situation where the condition was met asynchronously and queues not removed.
+    */
+   void check();
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
index 3f971fd..3ef97a9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 public class ReferenceCounterUtil implements ReferenceCounter {
 
-   private final Runnable runnable;
+   private Runnable task;
 
    /**
     * If executor is null the runnable will be called within the same thread, otherwise the executor will be used
@@ -30,15 +30,35 @@ public class ReferenceCounterUtil implements ReferenceCounter {
 
    private final AtomicInteger uses = new AtomicInteger(0);
 
-   public ReferenceCounterUtil(Runnable runnable) {
-      this(runnable, null);
+   public ReferenceCounterUtil() {
+      this.executor = null;
+      this.task = null;
+   }
+
+   public ReferenceCounterUtil(Executor executor) {
+      this.executor = executor;
    }
 
    public ReferenceCounterUtil(Runnable runnable, Executor executor) {
-      this.runnable = runnable;
+      this.setTask(runnable);
       this.executor = executor;
    }
 
+   public ReferenceCounterUtil(Runnable runnable) {
+      this.setTask(runnable);
+      this.executor = null;
+   }
+
+   @Override
+   public void setTask(Runnable task) {
+      this.task = task;
+   }
+
+   @Override
+   public Runnable getTask() {
+      return task;
+   }
+
    @Override
    public int increment() {
       return uses.incrementAndGet();
@@ -48,13 +68,29 @@ public class ReferenceCounterUtil implements ReferenceCounter {
    public int decrement() {
       int value = uses.decrementAndGet();
       if (value == 0) {
-         if (executor != null) {
-            executor.execute(runnable);
-         } else {
-            runnable.run();
-         }
+         execute();
       }
 
       return value;
    }
+
+   private void execute() {
+      if (executor != null) {
+         executor.execute(task);
+      } else {
+         task.run();
+      }
+   }
+
+   @Override
+   public void check() {
+      if (getCount() <= 0) {
+         execute();
+      }
+   }
+
+   @Override
+   public int getCount() {
+      return uses.get();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
index 865afff..7dbc9fb 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.artemis.utils;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -30,12 +29,13 @@ public class ReferenceCounterTest extends Assert {
 
    class LatchRunner implements Runnable {
 
-      final CountDownLatch latch = new CountDownLatch(1);
+      final ReusableLatch latch = new ReusableLatch(1);
       final AtomicInteger counts = new AtomicInteger(0);
-      volatile Thread lastThreadUsed;
+      volatile Thread lastThreadUsed = Thread.currentThread();
 
       @Override
       public void run() {
+         lastThreadUsed = Thread.currentThread();
          counts.incrementAndGet();
          latch.countDown();
       }
@@ -65,6 +65,15 @@ public class ReferenceCounterTest extends Assert {
 
       assertNotSame(runner.lastThreadUsed, Thread.currentThread());
 
+      runner.latch.setCount(1);
+      runner.lastThreadUsed = Thread.currentThread();
+
+      // force a recheck
+      counter.check();
+
+      runner.latch.await(5, TimeUnit.SECONDS);
+      assertNotSame(runner.lastThreadUsed, Thread.currentThread());
+
       executor.shutdown();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 14e13b1..587367b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@@ -43,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -344,22 +347,40 @@ public class AMQPSessionCallback implements SessionCallback {
    public void closeSender(final Object brokerConsumer) throws Exception {
 
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
+      final CountDownLatch latch = new CountDownLatch(1);
 
-      serverSession.getSessionContext().executeOnCompletion(new IOCallback() {
+      Runnable runnable = new Runnable() {
          @Override
-         public void done() {
+         public void run() {
             try {
                consumer.close(false);
+               latch.countDown();
             } catch (Exception e) {
-               logger.warn(e.getMessage(), e);
             }
          }
+      };
 
-         @Override
-         public void onError(int errorCode, String errorMessage) {
+      // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
+      // to avoid deadlocks the close has to be done outside of the main thread on an executor
+      // otherwise you could get a deadlock
+      Executor executor = protonSPI.getExeuctor();
+
+      if (executor != null) {
+         executor.execute(runnable);
+      } else {
+         runnable.run();
+      }
+
+      try {
+         // a short timeout will do.. 1 second is already long enough
+         if (!latch.await(1, TimeUnit.SECONDS)) {
+            logger.debug("Could not close consumer on time");
          }
-      });
+      } catch (InterruptedException e) {
+         throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
+      }
 
+      consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
    public String tempQueueName() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9a34837..844a49d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
@@ -298,4 +299,7 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void decDelivering(int size);
 
+   /** This is to perform a check on the counter again */
+   void recheckRefCount(OperationContext context);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0f47af1..31a4869 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -2942,6 +2943,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
    }
 
+   @Override
+   public void recheckRefCount(OperationContext context) {
+      ReferenceCounter refCount = refCountForConsumers;
+      if (refCount != null) {
+         context.executeOnCompletion(new IOCallback() {
+            @Override
+            public void done() {
+               refCount.check();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+
+            }
+         });
+      }
+
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index 82a700f..be83aca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -19,71 +19,57 @@ package org.apache.activemq.artemis.core.server.impl;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.QueueManager;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueManager;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
 
-public class QueueManagerImpl implements QueueManager {
+public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager {
 
    private final SimpleString queueName;
 
    private final ActiveMQServer server;
 
-   private final Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-         Queue queue = server.locateQueue(queueName);
-         //the queue may already have been deleted and this is a result of that
-         if (queue == null) {
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\"");
-            }
-            return;
+   private void doIt() {
+      Queue queue = server.locateQueue(queueName);
+      //the queue may already have been deleted and this is a result of that
+      if (queue == null) {
+         if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\"");
          }
-         SimpleString address = queue.getAddress();
-         AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
-         long consumerCount = queue.getConsumerCount();
-         long messageCount = queue.getMessageCount();
+         return;
+      }
+      SimpleString address = queue.getAddress();
+      AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+      long consumerCount = queue.getConsumerCount();
+      long messageCount = queue.getMessageCount();
 
-         if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) {
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
-            }
+      if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) {
+         if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
+         }
 
-            try {
-               server.destroyQueue(queueName, null, true, false);
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
-            }
-         } else if (queue.isPurgeOnNoConsumers()) {
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
-            }
-            try {
-               queue.deleteAllReferences();
-            } catch (Exception e) {
-               ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
-            }
+         try {
+            server.destroyQueue(queueName, null, true, false);
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
+         }
+      } else if (queue.isPurgeOnNoConsumers()) {
+         if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+            ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
+         }
+         try {
+            queue.deleteAllReferences();
+         } catch (Exception e) {
+            ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
          }
       }
-   };
-
-   private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
+   }
 
    public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
       this.server = server;
       this.queueName = queueName;
-   }
-
-   @Override
-   public int increment() {
-      return referenceCounterUtil.increment();
-   }
-
-   @Override
-   public int decrement() {
-      return referenceCounterUtil.decrement();
+      this.setTask(this::doIt);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8e64a21..36aa4e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -508,6 +508,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       tx.rollback();
 
+      messageQueue.recheckRefCount(session.getSessionContext());
+
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
index 125c9fe..ab14479 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
@@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.server.TransientQueueManager;
 import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
 import org.jboss.logging.Logger;
 
-public class TransientQueueManagerImpl implements TransientQueueManager {
+public class TransientQueueManagerImpl extends ReferenceCounterUtil implements TransientQueueManager {
 
    private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class);
 
@@ -32,41 +32,28 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
 
    private final ActiveMQServer server;
 
-   private final Runnable runnable = new Runnable() {
-      @Override
-      public void run() {
-         try {
-            if (logger.isDebugEnabled()) {
-               logger.debug("deleting temporary queue " + queueName);
-            }
+   private void doIt() {
+      try {
+         if (logger.isDebugEnabled()) {
+            logger.debug("deleting temporary queue " + queueName);
+         }
 
-            try {
-               server.destroyQueue(queueName, null, false);
-            } catch (ActiveMQException e) {
-               ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e);
-            }
-         } catch (Exception e) {
-            ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
+         try {
+            server.destroyQueue(queueName, null, false);
+         } catch (ActiveMQException e) {
+            ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e);
          }
+      } catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
       }
-   };
-
-   private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
+   }
 
    public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
       this.server = server;
 
       this.queueName = queueName;
-   }
 
-   @Override
-   public int increment() {
-      return referenceCounterUtil.increment();
-   }
-
-   @Override
-   public int decrement() {
-      return referenceCounterUtil.decrement();
+      this.setTask(this::doIt);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ddf702e..2707190 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -778,6 +779,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void recheckRefCount(OperationContext context) {
+      }
+
+      @Override
       public void unproposed(SimpleString groupID) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
index 0a1a9d5..8ba922d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -42,11 +42,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
-import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
 import org.apache.qpid.jms.JmsConnectionFactory;
-import org.junit.Assert;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -61,19 +58,11 @@ public class TopicDurableTests extends JMSClientTestSupport {
 
    @Test
    public void testMessageDurableSubscription() throws Exception {
-      for (int i = 0; i < 100; i++) {
-         testLoop();
-         tearDown();
-         setUp();
-      }
-   }
-
-   private void testLoop() throws Exception {
       JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
       Connection connection = connectionFactory.createConnection();
       connection.start();
 
-      System.err.println("testMessageDurableSubscription");
+      System.out.println("testMessageDurableSubscription");
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
       Topic testTopic =  session.createTopic("jmsTopic");
 
@@ -87,39 +76,39 @@ public class TopicDurableTests extends JMSClientTestSupport {
       String batchPrefix = "First";
       List<Message> listMsgs = generateMessages(session, batchPrefix, count);
       sendMessages(messageProducer, listMsgs);
-      System.err.println("First batch messages sent");
+      System.out.println("First batch messages sent");
 
       List<Message> recvd1 = receiveMessages(subscriber1, count);
       List<Message> recvd2 = receiveMessages(subscriber2, count);
 
       assertThat(recvd1.size(), is(count));
       assertMessageContent(recvd1, batchPrefix);
-      System.err.println(sub1ID + " :First batch messages received");
+      System.out.println(sub1ID + " :First batch messages received");
 
       assertThat(recvd2.size(), is(count));
       assertMessageContent(recvd2, batchPrefix);
-      System.err.println(sub2ID + " :First batch messages received");
+      System.out.println(sub2ID + " :First batch messages received");
 
       subscriber1.close();
-      System.err.println(sub1ID + " : closed");
+      System.out.println(sub1ID + " : closed");
 
       batchPrefix = "Second";
       listMsgs = generateMessages(session, batchPrefix, count);
       sendMessages(messageProducer, listMsgs);
-      System.err.println("Second batch messages sent");
+      System.out.println("Second batch messages sent");
 
       recvd2 = receiveMessages(subscriber2, count);
       assertThat(recvd2.size(), is(count));
       assertMessageContent(recvd2, batchPrefix);
-      System.err.println(sub2ID + " :Second batch messages received");
+      System.out.println(sub2ID + " :Second batch messages received");
 
       subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
-      System.err.println(sub1ID + " :connected");
+      System.out.println(sub1ID + " :connected");
 
       recvd1 = receiveMessages(subscriber1, count);
       assertThat(recvd1.size(), is(count));
       assertMessageContent(recvd1, batchPrefix);
-      System.err.println(sub1ID + " :Second batch messages received");
+      System.out.println(sub1ID + " :Second batch messages received");
 
       subscriber1.close();
       subscriber2.close();
@@ -131,9 +120,9 @@ public class TopicDurableTests extends JMSClientTestSupport {
 
    @Test
    public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
-      int iterations = 100;
+      int iterations = 10;
       for (int i = 0; i < iterations; i++) {
-         System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+         System.out.println("testSharedNonDurableSubscription; iteration: " + i);
          //SETUP-START
          JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
          Connection connection1 = connectionFactory1.createConnection();
@@ -167,14 +156,14 @@ public class TopicDurableTests extends JMSClientTestSupport {
          List<Message> listMsgs = generateMessages(session, count);
          List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
          sendMessages(messageProducer, listMsgs);
-         System.err.println("messages sent");
+         System.out.println("messages sent");
 
          assertThat("Each message should be received only by one consumer",
                     results.get(0).get(20, TimeUnit.SECONDS).size() +
                        results.get(1).get(20, TimeUnit.SECONDS).size() +
                        results.get(2).get(20, TimeUnit.SECONDS).size(),
                     is(count));
-         System.err.println("messages received");
+         System.out.println("messages received");
          //BODY-E
 
          //TEAR-DOWN-S
@@ -255,7 +244,7 @@ public class TopicDurableTests extends JMSClientTestSupport {
          resultsList.add(new CompletableFuture<>());
          receivedResList.add(new ArrayList<>());
          MessageListener myListener = message -> {
-            System.err.println("Mesages received" + message + " count: " + totalCount.get());
+            System.out.println("Mesages received" + message + " count: " + totalCount.get());
             receivedResList.get(index).add(message);
             if (totalCount.decrementAndGet() == 0) {
                for (int j = 0; j < consumer.length; j++) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index ef53344..0b36e18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -266,7 +266,21 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testAutoCreateCOnConsumer() throws Throwable {
+   public void testAutoCreateCOnConsumerAMQP() throws Throwable {
+      testAutoCreate(2);
+   }
+
+   @Test
+   public void testAutoCreateCOnConsumerCore() throws Throwable {
+      testAutoCreate(1);
+   }
+
+   @Test
+   public void testAutoCreateCOnConsumerOpenWire() throws Throwable {
+      testAutoCreate(3);
+   }
+
+   private void testAutoCreate(int protocol) throws Throwable {
 
       final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
       if (!isNetty()) {
@@ -275,7 +289,7 @@ public class ConsumerTest extends ActiveMQTestBase {
       }
 
       for (int i = 0; i < 10; i++) {
-         ConnectionFactory factorySend = createFactory(2);
+         ConnectionFactory factorySend = createFactory(protocol);
          Connection connection = factorySend.createConnection();
 
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 54cae7b..f654ed5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -76,6 +77,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void recheckRefCount(OperationContext context) {
+
+   }
+
+   @Override
    public boolean isPersistedPause() {
       return false;
    }


[2/3] activemq-artemis git commit: ARTEMIS-1529 Adding test on durable topics

Posted by jb...@apache.org.
ARTEMIS-1529 Adding test on durable topics


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dbb3aadd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dbb3aadd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dbb3aadd

Branch: refs/heads/master
Commit: dbb3aaddf65f07c5ef77ace3a7efeb3f83632d8a
Parents: 4584ac6
Author: Tomas Kratky <tk...@redhat.com>
Authored: Tue Nov 28 16:37:46 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500

----------------------------------------------------------------------
 .../integration/amqp/TopicDurableTests.java     | 270 +++++++++++++++++++
 1 file changed, 270 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbb3aadd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
new file mode 100644
index 0000000..0a1a9d5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class TopicDurableTests extends JMSClientTestSupport {
+
+   @Override
+   protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+      // do not create unnecessary queues
+   }
+
+
+   @Test
+   public void testMessageDurableSubscription() throws Exception {
+      for (int i = 0; i < 100; i++) {
+         testLoop();
+         tearDown();
+         setUp();
+      }
+   }
+
+   private void testLoop() throws Exception {
+      JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
+      Connection connection = connectionFactory.createConnection();
+      connection.start();
+
+      System.err.println("testMessageDurableSubscription");
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic testTopic =  session.createTopic("jmsTopic");
+
+      String sub1ID = "sub1DurSub";
+      String sub2ID = "sub2DurSub";
+      MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+      MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
+      MessageProducer messageProducer = session.createProducer(testTopic);
+
+      int count = 100;
+      String batchPrefix = "First";
+      List<Message> listMsgs = generateMessages(session, batchPrefix, count);
+      sendMessages(messageProducer, listMsgs);
+      System.err.println("First batch messages sent");
+
+      List<Message> recvd1 = receiveMessages(subscriber1, count);
+      List<Message> recvd2 = receiveMessages(subscriber2, count);
+
+      assertThat(recvd1.size(), is(count));
+      assertMessageContent(recvd1, batchPrefix);
+      System.err.println(sub1ID + " :First batch messages received");
+
+      assertThat(recvd2.size(), is(count));
+      assertMessageContent(recvd2, batchPrefix);
+      System.err.println(sub2ID + " :First batch messages received");
+
+      subscriber1.close();
+      System.err.println(sub1ID + " : closed");
+
+      batchPrefix = "Second";
+      listMsgs = generateMessages(session, batchPrefix, count);
+      sendMessages(messageProducer, listMsgs);
+      System.err.println("Second batch messages sent");
+
+      recvd2 = receiveMessages(subscriber2, count);
+      assertThat(recvd2.size(), is(count));
+      assertMessageContent(recvd2, batchPrefix);
+      System.err.println(sub2ID + " :Second batch messages received");
+
+      subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+      System.err.println(sub1ID + " :connected");
+
+      recvd1 = receiveMessages(subscriber1, count);
+      assertThat(recvd1.size(), is(count));
+      assertMessageContent(recvd1, batchPrefix);
+      System.err.println(sub1ID + " :Second batch messages received");
+
+      subscriber1.close();
+      subscriber2.close();
+
+      session.unsubscribe(sub1ID);
+      session.unsubscribe(sub2ID);
+   }
+
+
+   @Test
+   public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
+      int iterations = 100;
+      for (int i = 0; i < iterations; i++) {
+         System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+         //SETUP-START
+         JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+         Connection connection1 = connectionFactory1.createConnection();
+
+
+         Hashtable env2 = new Hashtable<Object, Object>();
+         env2.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+         env2.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
+         env2.put("topic." + "jmsTopic", "jmsTopic");
+         Context context2 = new InitialContext(env2);
+         ConnectionFactory connectionFactory2 = (ConnectionFactory) context2.lookup("qpidConnectionFactory");
+         Connection connection2 = connectionFactory2.createConnection();
+
+         connection1.start();
+         connection2.start();
+
+         Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic testTopic = session.createTopic("jmsTopic");
+         //SETUP-END
+
+         //BODY-S
+         String subID = "sharedConsumerNonDurable123";
+         MessageConsumer subscriber1 = session.createSharedConsumer(testTopic, subID);
+         MessageConsumer subscriber2 = session2.createSharedConsumer(testTopic, subID);
+         MessageConsumer subscriber3 = session2.createSharedConsumer(testTopic, subID);
+         MessageProducer messageProducer = session.createProducer(testTopic);
+         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+         int count = 10;
+         List<Message> listMsgs = generateMessages(session, count);
+         List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
+         sendMessages(messageProducer, listMsgs);
+         System.err.println("messages sent");
+
+         assertThat("Each message should be received only by one consumer",
+                    results.get(0).get(20, TimeUnit.SECONDS).size() +
+                       results.get(1).get(20, TimeUnit.SECONDS).size() +
+                       results.get(2).get(20, TimeUnit.SECONDS).size(),
+                    is(count));
+         System.err.println("messages received");
+         //BODY-E
+
+         //TEAR-DOWN-S
+         connection1.stop();
+         connection2.stop();
+         subscriber1.close();
+         subscriber2.close();
+         session.close();
+         session2.close();
+         connection1.close();
+         connection2.close();
+         //TEAR-DOWN-E
+      }
+   }
+
+
+   private void sendMessages(MessageProducer producer, List<Message> messages) {
+      messages.forEach(m -> {
+         try {
+            producer.send(m);
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+   }
+
+   protected List<Message> receiveMessages(MessageConsumer consumer, int count) {
+      return receiveMessages(consumer, count, 0);
+   }
+
+   protected List<Message> receiveMessages(MessageConsumer consumer, int count, long timeout) {
+      List<Message> recvd = new ArrayList<>();
+      IntStream.range(0, count).forEach(i -> {
+         try {
+            recvd.add(timeout > 0 ? consumer.receive(timeout) : consumer.receive());
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+      return recvd;
+   }
+
+   protected void assertMessageContent(List<Message> msgs, String content) {
+      msgs.forEach(m -> {
+         try {
+            assertTrue(((TextMessage) m).getText().contains(content));
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+   }
+
+   protected List<Message> generateMessages(Session session, int count) {
+      return generateMessages(session, "", count);
+   }
+
+   protected List<Message> generateMessages(Session session, String prefix, int count) {
+      List<Message> messages = new ArrayList<>();
+      StringBuilder sb = new StringBuilder();
+      IntStream.range(0, count).forEach(i -> {
+         try {
+            messages.add(session.createTextMessage(sb.append(prefix).append("testMessage").append(i).toString()));
+            sb.setLength(0);
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+      return messages;
+   }
+
+   protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
+      AtomicInteger totalCount = new AtomicInteger(count);
+      List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
+      List<List<Message>> receivedResList = new ArrayList<>();
+
+      for (int i = 0; i < consumer.length; i++) {
+         final int index = i;
+         resultsList.add(new CompletableFuture<>());
+         receivedResList.add(new ArrayList<>());
+         MessageListener myListener = message -> {
+            System.err.println("Mesages received" + message + " count: " + totalCount.get());
+            receivedResList.get(index).add(message);
+            if (totalCount.decrementAndGet() == 0) {
+               for (int j = 0; j < consumer.length; j++) {
+                  resultsList.get(j).complete(receivedResList.get(j));
+               }
+            }
+         };
+         consumer[i].setMessageListener(myListener);
+      }
+      return resultsList;
+   }
+}


[3/3] activemq-artemis git commit: This closes #1678

Posted by jb...@apache.org.
This closes #1678


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a822af47
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a822af47
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a822af47

Branch: refs/heads/master
Commit: a822af4712a5a10f0c32ac9a0f7dbe28bd6eebad
Parents: 4584ac6 8b7282d
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Nov 29 10:49:36 2017 -0600
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Nov 29 10:49:36 2017 -0600

----------------------------------------------------------------------
 .../artemis/utils/ReferenceCounter.java         |  15 ++
 .../artemis/utils/ReferenceCounterUtil.java     |  54 +++-
 .../artemis/utils/ReferenceCounterTest.java     |  15 +-
 .../amqp/broker/AMQPSessionCallback.java        |  33 ++-
 .../activemq/artemis/core/server/Queue.java     |   4 +
 .../artemis/core/server/impl/QueueImpl.java     |  20 ++
 .../core/server/impl/QueueManagerImpl.java      |  80 +++---
 .../core/server/impl/ServerConsumerImpl.java    |   2 +
 .../server/impl/TransientQueueManagerImpl.java  |  41 +--
 .../impl/ScheduledDeliveryHandlerTest.java      |   5 +
 .../integration/amqp/TopicDurableTests.java     | 259 +++++++++++++++++++
 .../tests/integration/client/ConsumerTest.java  |  18 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |   6 +
 13 files changed, 458 insertions(+), 94 deletions(-)
----------------------------------------------------------------------