You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/11/29 16:52:13 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1529 Fixing Ref count over
asynchronous ack
Repository: activemq-artemis
Updated Branches:
refs/heads/master 4584ac697 -> a822af471
ARTEMIS-1529 Fixing Ref count over asynchronous ack
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8b7282d8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8b7282d8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8b7282d8
Branch: refs/heads/master
Commit: 8b7282d849fca896f6b9794a5ddfc251db947120
Parents: dbb3aad
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Nov 28 21:08:05 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500
----------------------------------------------------------------------
.../artemis/utils/ReferenceCounter.java | 15 ++++
.../artemis/utils/ReferenceCounterUtil.java | 54 ++++++++++---
.../artemis/utils/ReferenceCounterTest.java | 15 +++-
.../amqp/broker/AMQPSessionCallback.java | 33 ++++++--
.../activemq/artemis/core/server/Queue.java | 4 +
.../artemis/core/server/impl/QueueImpl.java | 20 +++++
.../core/server/impl/QueueManagerImpl.java | 80 ++++++++------------
.../core/server/impl/ServerConsumerImpl.java | 2 +
.../server/impl/TransientQueueManagerImpl.java | 41 ++++------
.../impl/ScheduledDeliveryHandlerTest.java | 5 ++
.../integration/amqp/TopicDurableTests.java | 39 ++++------
.../tests/integration/client/ConsumerTest.java | 18 ++++-
.../unit/core/postoffice/impl/FakeQueue.java | 6 ++
13 files changed, 213 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
index 2f46fb1..423b6b4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounter.java
@@ -21,4 +21,19 @@ public interface ReferenceCounter {
int increment();
int decrement();
+
+ int getCount();
+
+
+ void setTask(Runnable task);
+
+ Runnable getTask();
+
+ /**
+ * Some asynchronous operations (like ack) may delay certain conditions.
+ * After met, during afterCompletion we may need to recheck certain values
+ * to make sure we won't get into a situation where the condition was met asynchronously and queues not removed.
+ */
+ void check();
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
index 3f971fd..3ef97a9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ReferenceCounterUtil.java
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ReferenceCounterUtil implements ReferenceCounter {
- private final Runnable runnable;
+ private Runnable task;
/**
* If executor is null the runnable will be called within the same thread, otherwise the executor will be used
@@ -30,15 +30,35 @@ public class ReferenceCounterUtil implements ReferenceCounter {
private final AtomicInteger uses = new AtomicInteger(0);
- public ReferenceCounterUtil(Runnable runnable) {
- this(runnable, null);
+ public ReferenceCounterUtil() {
+ this.executor = null;
+ this.task = null;
+ }
+
+ public ReferenceCounterUtil(Executor executor) {
+ this.executor = executor;
}
public ReferenceCounterUtil(Runnable runnable, Executor executor) {
- this.runnable = runnable;
+ this.setTask(runnable);
this.executor = executor;
}
+ public ReferenceCounterUtil(Runnable runnable) {
+ this.setTask(runnable);
+ this.executor = null;
+ }
+
+ @Override
+ public void setTask(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public Runnable getTask() {
+ return task;
+ }
+
@Override
public int increment() {
return uses.incrementAndGet();
@@ -48,13 +68,29 @@ public class ReferenceCounterUtil implements ReferenceCounter {
public int decrement() {
int value = uses.decrementAndGet();
if (value == 0) {
- if (executor != null) {
- executor.execute(runnable);
- } else {
- runnable.run();
- }
+ execute();
}
return value;
}
+
+ private void execute() {
+ if (executor != null) {
+ executor.execute(task);
+ } else {
+ task.run();
+ }
+ }
+
+ @Override
+ public void check() {
+ if (getCount() <= 0) {
+ execute();
+ }
+ }
+
+ @Override
+ public int getCount() {
+ return uses.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
index 865afff..7dbc9fb 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/ReferenceCounterTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.activemq.artemis.utils;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -30,12 +29,13 @@ public class ReferenceCounterTest extends Assert {
class LatchRunner implements Runnable {
- final CountDownLatch latch = new CountDownLatch(1);
+ final ReusableLatch latch = new ReusableLatch(1);
final AtomicInteger counts = new AtomicInteger(0);
- volatile Thread lastThreadUsed;
+ volatile Thread lastThreadUsed = Thread.currentThread();
@Override
public void run() {
+ lastThreadUsed = Thread.currentThread();
counts.incrementAndGet();
latch.countDown();
}
@@ -65,6 +65,15 @@ public class ReferenceCounterTest extends Assert {
assertNotSame(runner.lastThreadUsed, Thread.currentThread());
+ runner.latch.setCount(1);
+ runner.lastThreadUsed = Thread.currentThread();
+
+ // force a recheck
+ counter.check();
+
+ runner.latch.await(5, TimeUnit.SECONDS);
+ assertNotSame(runner.lastThreadUsed, Thread.currentThread());
+
executor.shutdown();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 14e13b1..587367b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,7 +16,9 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@@ -43,6 +45,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
+import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -344,22 +347,40 @@ public class AMQPSessionCallback implements SessionCallback {
public void closeSender(final Object brokerConsumer) throws Exception {
final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
+ final CountDownLatch latch = new CountDownLatch(1);
- serverSession.getSessionContext().executeOnCompletion(new IOCallback() {
+ Runnable runnable = new Runnable() {
@Override
- public void done() {
+ public void run() {
try {
consumer.close(false);
+ latch.countDown();
} catch (Exception e) {
- logger.warn(e.getMessage(), e);
}
}
+ };
- @Override
- public void onError(int errorCode, String errorMessage) {
+ // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol)
+ // to avoid deadlocks the close has to be done outside of the main thread on an executor
+ // otherwise you could get a deadlock
+ Executor executor = protonSPI.getExeuctor();
+
+ if (executor != null) {
+ executor.execute(runnable);
+ } else {
+ runnable.run();
+ }
+
+ try {
+ // a short timeout will do.. 1 second is already long enough
+ if (!latch.await(1, TimeUnit.SECONDS)) {
+ logger.debug("Could not close consumer on time");
}
- });
+ } catch (InterruptedException e) {
+ throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue());
+ }
+ consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
}
public String tempQueueName() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 9a34837..844a49d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@@ -298,4 +299,7 @@ public interface Queue extends Bindable,CriticalComponent {
void decDelivering(int size);
+ /** This is to perform a check on the counter again */
+ void recheckRefCount(OperationContext context);
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 0f47af1..31a4869 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
@@ -2942,6 +2943,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}
+ @Override
+ public void recheckRefCount(OperationContext context) {
+ ReferenceCounter refCount = refCountForConsumers;
+ if (refCount != null) {
+ context.executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ refCount.check();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+
+ }
+ });
+ }
+
+ }
+
// Inner classes
// --------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index 82a700f..be83aca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -19,71 +19,57 @@ package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.QueueManager;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
-public class QueueManagerImpl implements QueueManager {
+public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManager {
private final SimpleString queueName;
private final ActiveMQServer server;
- private final Runnable runnable = new Runnable() {
- @Override
- public void run() {
- Queue queue = server.locateQueue(queueName);
- //the queue may already have been deleted and this is a result of that
- if (queue == null) {
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\"");
- }
- return;
+ private void doIt() {
+ Queue queue = server.locateQueue(queueName);
+ //the queue may already have been deleted and this is a result of that
+ if (queue == null) {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\"");
}
- SimpleString address = queue.getAddress();
- AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
- long consumerCount = queue.getConsumerCount();
- long messageCount = queue.getMessageCount();
+ return;
+ }
+ SimpleString address = queue.getAddress();
+ AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+ long consumerCount = queue.getConsumerCount();
+ long messageCount = queue.getMessageCount();
- if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0) {
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
- }
+ if (queue.isAutoCreated() && settings.isAutoDeleteQueues() && queue.getMessageCount() == 0 && queue.getConsumerCount() == 0) {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQServerLogger.LOGGER.debug("deleting " + (queue.isAutoCreated() ? "auto-created " : "") + "queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount + "; isAutoDeleteQueues = " + settings.isAutoDeleteQueues());
+ }
- try {
- server.destroyQueue(queueName, null, true, false);
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
- }
- } else if (queue.isPurgeOnNoConsumers()) {
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
- }
- try {
- queue.deleteAllReferences();
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
- }
+ try {
+ server.destroyQueue(queueName, null, true, false);
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorRemovingAutoCreatedQueue(e, queueName);
+ }
+ } else if (queue.isPurgeOnNoConsumers()) {
+ if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
+ ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queueName + ".\" consumerCount = " + consumerCount + "; messageCount = " + messageCount);
+ }
+ try {
+ queue.deleteAllReferences();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.failedToPurgeQueue(e, queueName);
}
}
- };
-
- private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
+ }
public QueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
this.server = server;
this.queueName = queueName;
- }
-
- @Override
- public int increment() {
- return referenceCounterUtil.increment();
- }
-
- @Override
- public int decrement() {
- return referenceCounterUtil.decrement();
+ this.setTask(this::doIt);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8e64a21..36aa4e2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -508,6 +508,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.rollback();
+ messageQueue.recheckRefCount(session.getSessionContext());
+
if (!browseOnly) {
TypedProperties props = new TypedProperties();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
index 125c9fe..ab14479 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/TransientQueueManagerImpl.java
@@ -24,7 +24,7 @@ import org.apache.activemq.artemis.core.server.TransientQueueManager;
import org.apache.activemq.artemis.utils.ReferenceCounterUtil;
import org.jboss.logging.Logger;
-public class TransientQueueManagerImpl implements TransientQueueManager {
+public class TransientQueueManagerImpl extends ReferenceCounterUtil implements TransientQueueManager {
private static final Logger logger = Logger.getLogger(TransientQueueManagerImpl.class);
@@ -32,41 +32,28 @@ public class TransientQueueManagerImpl implements TransientQueueManager {
private final ActiveMQServer server;
- private final Runnable runnable = new Runnable() {
- @Override
- public void run() {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("deleting temporary queue " + queueName);
- }
+ private void doIt() {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("deleting temporary queue " + queueName);
+ }
- try {
- server.destroyQueue(queueName, null, false);
- } catch (ActiveMQException e) {
- ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e);
- }
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
+ try {
+ server.destroyQueue(queueName, null, false);
+ } catch (ActiveMQException e) {
+ ActiveMQServerLogger.LOGGER.errorOnDeletingQueue(queueName.toString(), e);
}
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorRemovingTempQueue(e, queueName);
}
- };
-
- private final ReferenceCounterUtil referenceCounterUtil = new ReferenceCounterUtil(runnable);
+ }
public TransientQueueManagerImpl(ActiveMQServer server, SimpleString queueName) {
this.server = server;
this.queueName = queueName;
- }
- @Override
- public int increment() {
- return referenceCounterUtil.increment();
- }
-
- @Override
- public int decrement() {
- return referenceCounterUtil.decrement();
+ this.setTask(this::doIt);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ddf702e..2707190 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -778,6 +779,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void recheckRefCount(OperationContext context) {
+ }
+
+ @Override
public void unproposed(SimpleString groupID) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
index 0a1a9d5..8ba922d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -42,11 +42,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.qpid.jms.JmsConnectionFactory;
-import org.junit.Assert;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
@@ -61,19 +58,11 @@ public class TopicDurableTests extends JMSClientTestSupport {
@Test
public void testMessageDurableSubscription() throws Exception {
- for (int i = 0; i < 100; i++) {
- testLoop();
- tearDown();
- setUp();
- }
- }
-
- private void testLoop() throws Exception {
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
Connection connection = connectionFactory.createConnection();
connection.start();
- System.err.println("testMessageDurableSubscription");
+ System.out.println("testMessageDurableSubscription");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic testTopic = session.createTopic("jmsTopic");
@@ -87,39 +76,39 @@ public class TopicDurableTests extends JMSClientTestSupport {
String batchPrefix = "First";
List<Message> listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs);
- System.err.println("First batch messages sent");
+ System.out.println("First batch messages sent");
List<Message> recvd1 = receiveMessages(subscriber1, count);
List<Message> recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix);
- System.err.println(sub1ID + " :First batch messages received");
+ System.out.println(sub1ID + " :First batch messages received");
assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix);
- System.err.println(sub2ID + " :First batch messages received");
+ System.out.println(sub2ID + " :First batch messages received");
subscriber1.close();
- System.err.println(sub1ID + " : closed");
+ System.out.println(sub1ID + " : closed");
batchPrefix = "Second";
listMsgs = generateMessages(session, batchPrefix, count);
sendMessages(messageProducer, listMsgs);
- System.err.println("Second batch messages sent");
+ System.out.println("Second batch messages sent");
recvd2 = receiveMessages(subscriber2, count);
assertThat(recvd2.size(), is(count));
assertMessageContent(recvd2, batchPrefix);
- System.err.println(sub2ID + " :Second batch messages received");
+ System.out.println(sub2ID + " :Second batch messages received");
subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
- System.err.println(sub1ID + " :connected");
+ System.out.println(sub1ID + " :connected");
recvd1 = receiveMessages(subscriber1, count);
assertThat(recvd1.size(), is(count));
assertMessageContent(recvd1, batchPrefix);
- System.err.println(sub1ID + " :Second batch messages received");
+ System.out.println(sub1ID + " :Second batch messages received");
subscriber1.close();
subscriber2.close();
@@ -131,9 +120,9 @@ public class TopicDurableTests extends JMSClientTestSupport {
@Test
public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
- int iterations = 100;
+ int iterations = 10;
for (int i = 0; i < iterations; i++) {
- System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+ System.out.println("testSharedNonDurableSubscription; iteration: " + i);
//SETUP-START
JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
Connection connection1 = connectionFactory1.createConnection();
@@ -167,14 +156,14 @@ public class TopicDurableTests extends JMSClientTestSupport {
List<Message> listMsgs = generateMessages(session, count);
List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
sendMessages(messageProducer, listMsgs);
- System.err.println("messages sent");
+ System.out.println("messages sent");
assertThat("Each message should be received only by one consumer",
results.get(0).get(20, TimeUnit.SECONDS).size() +
results.get(1).get(20, TimeUnit.SECONDS).size() +
results.get(2).get(20, TimeUnit.SECONDS).size(),
is(count));
- System.err.println("messages received");
+ System.out.println("messages received");
//BODY-E
//TEAR-DOWN-S
@@ -255,7 +244,7 @@ public class TopicDurableTests extends JMSClientTestSupport {
resultsList.add(new CompletableFuture<>());
receivedResList.add(new ArrayList<>());
MessageListener myListener = message -> {
- System.err.println("Mesages received" + message + " count: " + totalCount.get());
+ System.out.println("Mesages received" + message + " count: " + totalCount.get());
receivedResList.get(index).add(message);
if (totalCount.decrementAndGet() == 0) {
for (int j = 0; j < consumer.length; j++) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index ef53344..0b36e18 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -266,7 +266,21 @@ public class ConsumerTest extends ActiveMQTestBase {
}
@Test
- public void testAutoCreateCOnConsumer() throws Throwable {
+ public void testAutoCreateCOnConsumerAMQP() throws Throwable {
+ testAutoCreate(2);
+ }
+
+ @Test
+ public void testAutoCreateCOnConsumerCore() throws Throwable {
+ testAutoCreate(1);
+ }
+
+ @Test
+ public void testAutoCreateCOnConsumerOpenWire() throws Throwable {
+ testAutoCreate(3);
+ }
+
+ private void testAutoCreate(int protocol) throws Throwable {
final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
if (!isNetty()) {
@@ -275,7 +289,7 @@ public class ConsumerTest extends ActiveMQTestBase {
}
for (int i = 0; i < 10; i++) {
- ConnectionFactory factorySend = createFactory(2);
+ ConnectionFactory factorySend = createFactory(protocol);
Connection connection = factorySend.createConnection();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8b7282d8/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 54cae7b..f654ed5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -76,6 +77,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void recheckRefCount(OperationContext context) {
+
+ }
+
+ @Override
public boolean isPersistedPause() {
return false;
}
[2/3] activemq-artemis git commit: ARTEMIS-1529 Adding test on
durable topics
Posted by jb...@apache.org.
ARTEMIS-1529 Adding test on durable topics
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dbb3aadd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dbb3aadd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dbb3aadd
Branch: refs/heads/master
Commit: dbb3aaddf65f07c5ef77ace3a7efeb3f83632d8a
Parents: 4584ac6
Author: Tomas Kratky <tk...@redhat.com>
Authored: Tue Nov 28 16:37:46 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500
----------------------------------------------------------------------
.../integration/amqp/TopicDurableTests.java | 270 +++++++++++++++++++
1 file changed, 270 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbb3aadd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
new file mode 100644
index 0000000..0a1a9d5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class TopicDurableTests extends JMSClientTestSupport {
+
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+ // do not create unnecessary queues
+ }
+
+
+ @Test
+ public void testMessageDurableSubscription() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ testLoop();
+ tearDown();
+ setUp();
+ }
+ }
+
+ private void testLoop() throws Exception {
+ JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ System.err.println("testMessageDurableSubscription");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic testTopic = session.createTopic("jmsTopic");
+
+ String sub1ID = "sub1DurSub";
+ String sub2ID = "sub2DurSub";
+ MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+ MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
+ MessageProducer messageProducer = session.createProducer(testTopic);
+
+ int count = 100;
+ String batchPrefix = "First";
+ List<Message> listMsgs = generateMessages(session, batchPrefix, count);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("First batch messages sent");
+
+ List<Message> recvd1 = receiveMessages(subscriber1, count);
+ List<Message> recvd2 = receiveMessages(subscriber2, count);
+
+ assertThat(recvd1.size(), is(count));
+ assertMessageContent(recvd1, batchPrefix);
+ System.err.println(sub1ID + " :First batch messages received");
+
+ assertThat(recvd2.size(), is(count));
+ assertMessageContent(recvd2, batchPrefix);
+ System.err.println(sub2ID + " :First batch messages received");
+
+ subscriber1.close();
+ System.err.println(sub1ID + " : closed");
+
+ batchPrefix = "Second";
+ listMsgs = generateMessages(session, batchPrefix, count);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("Second batch messages sent");
+
+ recvd2 = receiveMessages(subscriber2, count);
+ assertThat(recvd2.size(), is(count));
+ assertMessageContent(recvd2, batchPrefix);
+ System.err.println(sub2ID + " :Second batch messages received");
+
+ subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+ System.err.println(sub1ID + " :connected");
+
+ recvd1 = receiveMessages(subscriber1, count);
+ assertThat(recvd1.size(), is(count));
+ assertMessageContent(recvd1, batchPrefix);
+ System.err.println(sub1ID + " :Second batch messages received");
+
+ subscriber1.close();
+ subscriber2.close();
+
+ session.unsubscribe(sub1ID);
+ session.unsubscribe(sub2ID);
+ }
+
+
+ @Test
+ public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
+ int iterations = 100;
+ for (int i = 0; i < iterations; i++) {
+ System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+ //SETUP-START
+ JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+ Connection connection1 = connectionFactory1.createConnection();
+
+
+ Hashtable env2 = new Hashtable<Object, Object>();
+ env2.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+ env2.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
+ env2.put("topic." + "jmsTopic", "jmsTopic");
+ Context context2 = new InitialContext(env2);
+ ConnectionFactory connectionFactory2 = (ConnectionFactory) context2.lookup("qpidConnectionFactory");
+ Connection connection2 = connectionFactory2.createConnection();
+
+ connection1.start();
+ connection2.start();
+
+ Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic testTopic = session.createTopic("jmsTopic");
+ //SETUP-END
+
+ //BODY-S
+ String subID = "sharedConsumerNonDurable123";
+ MessageConsumer subscriber1 = session.createSharedConsumer(testTopic, subID);
+ MessageConsumer subscriber2 = session2.createSharedConsumer(testTopic, subID);
+ MessageConsumer subscriber3 = session2.createSharedConsumer(testTopic, subID);
+ MessageProducer messageProducer = session.createProducer(testTopic);
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ int count = 10;
+ List<Message> listMsgs = generateMessages(session, count);
+ List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("messages sent");
+
+ assertThat("Each message should be received only by one consumer",
+ results.get(0).get(20, TimeUnit.SECONDS).size() +
+ results.get(1).get(20, TimeUnit.SECONDS).size() +
+ results.get(2).get(20, TimeUnit.SECONDS).size(),
+ is(count));
+ System.err.println("messages received");
+ //BODY-E
+
+ //TEAR-DOWN-S
+ connection1.stop();
+ connection2.stop();
+ subscriber1.close();
+ subscriber2.close();
+ session.close();
+ session2.close();
+ connection1.close();
+ connection2.close();
+ //TEAR-DOWN-E
+ }
+ }
+
+
+ private void sendMessages(MessageProducer producer, List<Message> messages) {
+ messages.forEach(m -> {
+ try {
+ producer.send(m);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ protected List<Message> receiveMessages(MessageConsumer consumer, int count) {
+ return receiveMessages(consumer, count, 0);
+ }
+
+ protected List<Message> receiveMessages(MessageConsumer consumer, int count, long timeout) {
+ List<Message> recvd = new ArrayList<>();
+ IntStream.range(0, count).forEach(i -> {
+ try {
+ recvd.add(timeout > 0 ? consumer.receive(timeout) : consumer.receive());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ return recvd;
+ }
+
+ protected void assertMessageContent(List<Message> msgs, String content) {
+ msgs.forEach(m -> {
+ try {
+ assertTrue(((TextMessage) m).getText().contains(content));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ protected List<Message> generateMessages(Session session, int count) {
+ return generateMessages(session, "", count);
+ }
+
+ protected List<Message> generateMessages(Session session, String prefix, int count) {
+ List<Message> messages = new ArrayList<>();
+ StringBuilder sb = new StringBuilder();
+ IntStream.range(0, count).forEach(i -> {
+ try {
+ messages.add(session.createTextMessage(sb.append(prefix).append("testMessage").append(i).toString()));
+ sb.setLength(0);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ return messages;
+ }
+
+ protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
+ AtomicInteger totalCount = new AtomicInteger(count);
+ List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
+ List<List<Message>> receivedResList = new ArrayList<>();
+
+ for (int i = 0; i < consumer.length; i++) {
+ final int index = i;
+ resultsList.add(new CompletableFuture<>());
+ receivedResList.add(new ArrayList<>());
+ MessageListener myListener = message -> {
+ System.err.println("Mesages received" + message + " count: " + totalCount.get());
+ receivedResList.get(index).add(message);
+ if (totalCount.decrementAndGet() == 0) {
+ for (int j = 0; j < consumer.length; j++) {
+ resultsList.get(j).complete(receivedResList.get(j));
+ }
+ }
+ };
+ consumer[i].setMessageListener(myListener);
+ }
+ return resultsList;
+ }
+}
[3/3] activemq-artemis git commit: This closes #1678
Posted by jb...@apache.org.
This closes #1678
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a822af47
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a822af47
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a822af47
Branch: refs/heads/master
Commit: a822af4712a5a10f0c32ac9a0f7dbe28bd6eebad
Parents: 4584ac6 8b7282d
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Nov 29 10:49:36 2017 -0600
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Nov 29 10:49:36 2017 -0600
----------------------------------------------------------------------
.../artemis/utils/ReferenceCounter.java | 15 ++
.../artemis/utils/ReferenceCounterUtil.java | 54 +++-
.../artemis/utils/ReferenceCounterTest.java | 15 +-
.../amqp/broker/AMQPSessionCallback.java | 33 ++-
.../activemq/artemis/core/server/Queue.java | 4 +
.../artemis/core/server/impl/QueueImpl.java | 20 ++
.../core/server/impl/QueueManagerImpl.java | 80 +++---
.../core/server/impl/ServerConsumerImpl.java | 2 +
.../server/impl/TransientQueueManagerImpl.java | 41 +--
.../impl/ScheduledDeliveryHandlerTest.java | 5 +
.../integration/amqp/TopicDurableTests.java | 259 +++++++++++++++++++
.../tests/integration/client/ConsumerTest.java | 18 +-
.../unit/core/postoffice/impl/FakeQueue.java | 6 +
13 files changed, 458 insertions(+), 94 deletions(-)
----------------------------------------------------------------------