You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/09/27 16:46:13 UTC

[01/14] activemq git commit: AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix and test

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x 02d53a0d9 -> 195046c50


AMQ-6422 - include the inflight count in the prefetch for positive remote credit flows. Fix and test


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/94ffb1bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/94ffb1bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/94ffb1bf

Branch: refs/heads/activemq-5.14.x
Commit: 94ffb1bf8142dcb4c2c1c3c200fb7569c4b03273
Parents: 02d53a0
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 7 17:28:35 2016 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:12:33 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpConnection.java |  13 +++
 .../transport/amqp/protocol/AmqpSender.java     |  24 ++++-
 .../amqp/interop/AmqpSendReceiveTest.java       | 106 +++++++++++++++++++
 .../activemq/broker/region/RegionBroker.java    |   2 +-
 4 files changed, 139 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/94ffb1bf/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 929fa24..5a402ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -45,8 +45,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.InvalidClientIDException;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.AbstractRegion;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTempDestination;
@@ -712,6 +714,17 @@ public class AmqpConnection implements AmqpProtocolConverter {
         return result;
     }
 
+
+    Subscription lookupPrefetchSubscription(ConsumerInfo consumerInfo)  {
+        Subscription subscription = null;
+        try {
+            subscription = ((AbstractRegion)((RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class)).getRegion(consumerInfo.getDestination())).getSubscriptions().get(consumerInfo.getConsumerId());
+        } catch (Exception e) {
+            LOG.warn("Error finding subscription for: " + consumerInfo + ": " + e.getMessage(), false, e);
+        }
+        return subscription;
+    }
+
     ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
         ActiveMQDestination rc = null;
         if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/94ffb1bf/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 12bd627..0b85858 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -21,6 +21,7 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
 import java.io.IOException;
 import java.util.LinkedList;
 
+import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerControl;
@@ -52,6 +53,7 @@ import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
 import org.fusesource.hawtbuf.Buffer;
 import org.slf4j.Logger;
@@ -79,6 +81,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
+    private Subscription subscription;
     private final boolean presettle;
 
     private boolean draining;
@@ -108,6 +111,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void open() {
         if (!isClosed()) {
             session.registerSender(getConsumerId(), this);
+            subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo);
         }
 
         super.open();
@@ -162,13 +166,14 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     @Override
     public void flow() throws Exception {
+        Link endpoint = getEndpoint();
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
-                      draining, getEndpoint().getDrain(),
-                      getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
+            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}",
+                    draining, endpoint.getDrain(),
+                    endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled());
         }
 
-        if (getEndpoint().getDrain() && !draining) {
+        if (endpoint.getDrain() && !draining) {
 
             // Revert to a pull consumer.
             ConsumerControl control = new ConsumerControl();
@@ -207,7 +212,16 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             ConsumerControl control = new ConsumerControl();
             control.setConsumerId(getConsumerId());
             control.setDestination(getDestination());
-            control.setPrefetch(getEndpoint().getCredit());
+
+            int remoteCredit = endpoint.getRemoteCredit();
+            if (remoteCredit > 0 && subscription != null) {
+                // ensure prefetch exceeds credit + inflight
+                if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) {
+                    LOG.trace("Adding dispatched size to credit for sub: " + subscription);
+                    remoteCredit += subscription.getDispatchedQueueSize();
+                }
+            }
+            control.setPrefetch(remoteCredit);
 
             LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/94ffb1bf/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index e48fef7..c27c0f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -22,6 +22,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.LinkedList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -92,6 +96,108 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReceiveFlowDispositionSingleCredit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        for (int i=0;i<2; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+        sender.close();
+        connection.close();
+
+        LOG.info("Starting consumer connection");
+        connection = client.connect();
+        session = connection.createSession();
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        receiver.flow(1);
+        received.accept();
+
+        received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        receiver.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveFlowDispositionSingleCreditTopic() throws Exception {
+        final AmqpClient client = createAmqpClient();
+        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final CountDownLatch receiverReady = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Starting consumer connection");
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+                    AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
+                    receiver.flow(1);
+                    receiverReady.countDown();
+                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+
+                    receiver.flow(1);
+                    received.accept();
+
+                    received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+                    received.accept();
+
+                    receiver.close();
+                    connection.close();
+
+                } catch (Exception error) {
+                    errors.add(error);
+                }
+
+            }
+        });
+
+        // producer
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+
+                    receiverReady.await(20, TimeUnit.SECONDS);
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+
+                    AmqpSender sender = session.createSender("topic://" + getTestName());
+                    for (int i = 0; i < 2; i++) {
+                        AmqpMessage message = new AmqpMessage();
+                        message.setMessageId("msg" + i);
+                        sender.send(message);
+                    }
+                    sender.close();
+                    connection.close();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(20, TimeUnit.SECONDS);
+        assertTrue("no errors: " + errors, errors.isEmpty());
+    }
+
+
+    @Test(timeout = 60000)
     public void testReceiveWithJMSSelectorFilter() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();

http://git-wip-us.apache.org/repos/asf/activemq/blob/94ffb1bf/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 69e0930..036eed3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -484,7 +484,7 @@ public class RegionBroker extends EmptyBroker {
         consumerExchange.getRegion().acknowledge(consumerExchange, ack);
     }
 
-    protected Region getRegion(ActiveMQDestination destination) throws JMSException {
+    public Region getRegion(ActiveMQDestination destination) throws JMSException {
         switch (destination.getDestinationType()) {
             case ActiveMQDestination.QUEUE_TYPE:
                 return queueRegion;


[12/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view credit to prefetchExtension - tracking credit to dispatch delta to track additional flow requests. Proton sender layer is distinct from the transport l

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422 - match proton sender view credit to prefetchExtension - tracking credit to dispatch delta to track additional flow requests. Proton sender layer is distinct from the transport layer - they mirror each other


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ebbb7ab4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ebbb7ab4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ebbb7ab4

Branch: refs/heads/activemq-5.14.x
Commit: ebbb7ab437b3023be5afe731a0015fec58a51d57
Parents: 0bb76c7
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 21 10:33:20 2016 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:50 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     | 91 ++++++++++----------
 .../amqp/JMSClientTransactionTest.java          |  6 --
 .../amqp/interop/AmqpSendReceiveTest.java       |  8 +-
 .../activemq/broker/region/AbstractRegion.java  |  2 +-
 .../broker/region/AbstractSubscription.java     | 12 +++
 .../broker/region/PrefetchSubscription.java     | 10 +--
 .../broker/region/QueueSubscription.java        |  2 +-
 .../broker/region/TopicSubscription.java        | 21 ++++-
 .../TopicSubscriptionZeroPrefetchTest.java      | 19 ++++
 9 files changed, 103 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 0b85858..75f2371 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -20,8 +20,9 @@ import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.AbstractSubscription;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ConsumerControl;
@@ -81,7 +82,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final String MESSAGE_FORMAT_KEY = outboundTransformer.getPrefixVendor() + "MESSAGE_FORMAT";
 
     private final ConsumerInfo consumerInfo;
-    private Subscription subscription;
+    private AbstractSubscription subscription;
+    private AtomicInteger prefetchExtension;
+    private int currentCreditRequest;
+    private int logicalDeliveryCount; // echoes prefetch extension but from protons perspective
     private final boolean presettle;
 
     private boolean draining;
@@ -111,7 +115,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void open() {
         if (!isClosed()) {
             session.registerSender(getConsumerId(), this);
-            subscription = session.getConnection().lookupPrefetchSubscription(consumerInfo);
+            subscription = (AbstractSubscription)session.getConnection().lookupPrefetchSubscription(consumerInfo);
+            prefetchExtension = subscription.getPrefetchExtension();
         }
 
         super.open();
@@ -168,24 +173,15 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void flow() throws Exception {
         Link endpoint = getEndpoint();
         if (LOG.isTraceEnabled()) {
-            LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}, unsettled={}",
+            LOG.trace("Flow: draining={}, drain={} credit={}, currentCredit={}, senderDeliveryCount={} - Sub={}",
                     draining, endpoint.getDrain(),
-                    endpoint.getCredit(), endpoint.getRemoteCredit(), endpoint.getQueued(), endpoint.getUnsettled());
+                    endpoint.getCredit(), currentCreditRequest, logicalDeliveryCount, subscription);
         }
 
+        final int endpointCredit = endpoint.getCredit();
         if (endpoint.getDrain() && !draining) {
 
-            // Revert to a pull consumer.
-            ConsumerControl control = new ConsumerControl();
-            control.setConsumerId(getConsumerId());
-            control.setDestination(getDestination());
-            control.setPrefetch(0);
-
-            LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
-
-            sendToActiveMQ(control);
-
-            if (endpoint.getCredit() > 0) {
+            if (endpointCredit > 0) {
                 draining = true;
 
                 // Now request dispatch of the drain amount, we request immediate
@@ -196,9 +192,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 pullRequest.setDestination(getDestination());
                 pullRequest.setTimeout(-1);
                 pullRequest.setAlwaysSignalDone(true);
-                pullRequest.setQuantity(endpoint.getCredit());
+                pullRequest.setQuantity(endpointCredit);
 
-                LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
+                LOG.trace("Pull case -> consumer pull request quantity = {}", endpointCredit);
 
                 sendToActiveMQ(pullRequest);
             } else {
@@ -207,25 +203,36 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 pumpOutbound();
                 getEndpoint().drained();
                 session.pumpProtonToSocket();
+                currentCreditRequest = 0;
+                logicalDeliveryCount = 0;
             }
-        } else {
-            ConsumerControl control = new ConsumerControl();
-            control.setConsumerId(getConsumerId());
-            control.setDestination(getDestination());
-
-            int remoteCredit = endpoint.getRemoteCredit();
-            if (remoteCredit > 0 && subscription != null) {
-                // ensure prefetch exceeds credit + inflight
-                if (remoteCredit + endpoint.getUnsettled() + endpoint.getQueued() > subscription.getPrefetchSize()) {
-                    LOG.trace("Adding dispatched size to credit for sub: " + subscription);
-                    remoteCredit += subscription.getDispatchedQueueSize();
-                }
-            }
-            control.setPrefetch(remoteCredit);
+        } else if (endpointCredit >= 0) {
+
+            if (endpointCredit == 0 && currentCreditRequest != 0) {
+
+                prefetchExtension.set(0);
+                currentCreditRequest = 0;
+                logicalDeliveryCount = 0;
+                LOG.trace("Flow: credit 0 for sub:" + subscription);
 
-            LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
+            } else {
 
-            sendToActiveMQ(control);
+                int deltaToAdd = endpointCredit;
+                int logicalCredit = currentCreditRequest - logicalDeliveryCount;
+                if (logicalCredit > 0) {
+                    deltaToAdd -= logicalCredit;
+                } else {
+                    // reset delivery counter - dispatch from broker concurrent with credit=0 flow can go negative
+                    logicalDeliveryCount = 0;
+                }
+                if (deltaToAdd > 0) {
+                    currentCreditRequest = prefetchExtension.addAndGet(deltaToAdd);
+                    subscription.wakeupDestinationsForDispatch();
+                    // force dispatch of matched/pending for topics (pending messages accumulate in the sub and are dispatched on update of prefetch)
+                    subscription.setPrefetchSize(0);
+                    LOG.trace("Flow: credit addition of {} for sub {}", deltaToAdd, subscription);
+                }
+            }
         }
     }
 
@@ -285,6 +292,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         }
 
         pumpOutbound();
+        logicalDeliveryCount++;
     }
 
     @Override
@@ -440,6 +448,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                     // It's the end of browse signal in response to a MessagePull
                     getEndpoint().drained();
                     draining = false;
+                    currentCreditRequest = 0;
+                    logicalDeliveryCount = 0;
                 } else {
                     if (LOG.isTraceEnabled()) {
                         LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
@@ -451,6 +461,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
                         getEndpoint().drained();
                         draining = false;
+                        currentCreditRequest = 0;
+                        logicalDeliveryCount = 0;
                     }
 
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
@@ -481,17 +493,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             tagCache.returnTag(tag);
         }
 
-        int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
-        LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
-                  getEndpoint().getName(), newCredit);
-
-        ConsumerControl control = new ConsumerControl();
-        control.setConsumerId(getConsumerId());
-        control.setDestination(getDestination());
-        control.setPrefetch(newCredit);
-
-        sendToActiveMQ(control);
-
         if (ackType == -1) {
             // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
             delivery.settle();

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 1251410..f481ba9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -193,8 +193,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
         assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
         SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
         assertNotNull(subscription);
-        LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
-        assertTrue(subscription.getPrefetchSize() > 0);
 
         for (int i = 1; i <= MSG_COUNT; i++) {
             LOG.info("Trying to receive message: {}", i);
@@ -259,8 +257,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
         assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
         SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
         assertNotNull(subscription);
-        LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
-        assertTrue(subscription.getPrefetchSize() > 0);
 
         assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
 
@@ -273,7 +269,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
         LOG.info("COMMIT of first received batch here:");
         session.commit();
 
-        assertTrue(subscription.getPrefetchSize() > 0);
         for (int i = 1; i <= MSG_COUNT; i++) {
             LOG.info("Sending message: {} to commit", msgIndex++);
             TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
@@ -286,7 +281,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
 
         LOG.info("WAITING -> for next three messages to arrive:");
 
-        assertTrue(subscription.getPrefetchSize() > 0);
         assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
 
             @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 3132e6e..34436f2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -294,7 +294,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         receiver2.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Receiver #2 should have read a message", message);
+            assertNotNull("Receiver #2 should have read message[" + i + "]", message);
             LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }
@@ -671,7 +671,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         LOG.info("*** Attempting to read remaining messages with both receivers");
         int splitCredit = (MSG_COUNT - 4) / 2;
 
-        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+        LOG.info("**** Receiver #1 granting credit[{}] for its block of messages", splitCredit);
         receiver1.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
@@ -680,11 +680,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
             message.accept();
         }
 
-        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+        LOG.info("**** Receiver #2 granting credit[{}] for its block of messages", splitCredit);
         receiver2.flow(splitCredit);
         for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Receiver #2 should have read a message", message);
+            assertNotNull("Receiver #2 should have read a message[" + i + "]", message);
             LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
index be77b6e..13251c8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
@@ -688,7 +688,7 @@ public abstract class AbstractRegion implements Region {
                     entry.configurePrefetch(sub);
                 }
             }
-            LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getCurrentPrefetchSize()});
+            LOG.debug("setting prefetch: {}, on subscription: {}; resulting value: {}", new Object[]{ control.getPrefetch(), control.getConsumerId(), sub.getConsumerInfo().getPrefetchSize()});
             try {
                 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
index 1d84269..3cb2f1f 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
@@ -49,6 +50,7 @@ public abstract class AbstractSubscription implements Subscription {
     protected ConsumerInfo info;
     protected final DestinationFilter destinationFilter;
     protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
+    protected final AtomicInteger prefetchExtension = new AtomicInteger(0);
 
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
@@ -309,4 +311,14 @@ public abstract class AbstractSubscription implements Subscription {
     public SubscriptionStatistics getSubscriptionStatistics() {
         return subscriptionStatistics;
     }
+
+    public void wakeupDestinationsForDispatch() {
+        for (Destination dest : destinations) {
+            dest.wakeup();
+        }
+    }
+
+    public AtomicInteger getPrefetchExtension() {
+        return this.prefetchExtension;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 5254440..0a277fb 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -23,7 +23,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 
@@ -57,7 +56,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
     protected PendingMessageCursor pending;
     protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
-    protected final AtomicInteger prefetchExtension = new AtomicInteger();
     protected boolean usePrefetchExtension = true;
     private int maxProducersToAudit=32;
     private int maxAuditDepth=2048;
@@ -431,9 +429,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
             dispatchPending();
 
             if (pending.isEmpty()) {
-                for (Destination dest : destinations) {
-                    dest.wakeup();
-                }
+                wakeupDestinationsForDispatch();
             }
         } else {
             LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): {}", ack);
@@ -904,10 +900,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
         this.usePrefetchExtension = usePrefetchExtension;
     }
 
-    protected int getPrefetchExtension() {
-        return this.prefetchExtension.get();
-    }
-
     @Override
     public void setPrefetchSize(int prefetchSize) {
         this.info.setPrefetchSize(prefetchSize);

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
index 358f946..6e865ec 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
@@ -69,7 +69,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
     @Override
     public synchronized String toString() {
         return "QueueSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + ", delivered="
-               + this.prefetchExtension + ", pending=" + getPendingQueueSize();
+               + this.prefetchExtension + ", pending=" + getPendingQueueSize() + ", prefetch=" + getPrefetchSize() + ", prefetchExtension=" + prefetchExtension.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index eff2393..6ab264d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -64,7 +64,6 @@ public class TopicSubscription extends AbstractSubscription {
     private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
     private int discarded;
     private final Object matchedListMutex = new Object();
-    private final AtomicInteger prefetchExtension = new AtomicInteger(0);
     private int memoryUsageHighWaterMark = 95;
     // allow duplicate suppression in a ring network of brokers
     protected int maxProducersToAudit = 1024;
@@ -410,6 +409,16 @@ public class TopicSubscription extends AbstractSubscription {
         }
     }
 
+    private void decrementPrefetchExtension() {
+        while (true) {
+            int currentExtension = prefetchExtension.get();
+            int newExtension = Math.max(0, currentExtension - 1);
+            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                break;
+            }
+        }
+    }
+
     @Override
     public int countBeforeFull() {
         return getPrefetchSize() == 0 ? prefetchExtension.get() : info.getPrefetchSize() + prefetchExtension.get() - getDispatchedQueueSize();
@@ -529,6 +538,9 @@ public class TopicSubscription extends AbstractSubscription {
     // -------------------------------------------------------------------------
     @Override
     public boolean isFull() {
+        if (info.getPrefetchSize() == 0) {
+            return prefetchExtension.get() == 0;
+        }
         return getDispatchedQueueSize() >= info.getPrefetchSize();
     }
 
@@ -655,6 +667,11 @@ public class TopicSubscription extends AbstractSubscription {
                     }
                 }
             }
+
+            if (getPrefetchSize() == 0) {
+                decrementPrefetchExtension();
+            }
+
         }
         if (info.isDispatchAsync()) {
             if (node != null) {
@@ -712,7 +729,7 @@ public class TopicSubscription extends AbstractSubscription {
     @Override
     public String toString() {
         return "TopicSubscription:" + " consumer=" + info.getConsumerId() + ", destinations=" + destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered="
-                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
+                + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded() + ", prefetchExtension=" + prefetchExtension.get();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ebbb7ab4/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
index b9f0d50..38fa921 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TopicSubscriptionZeroPrefetchTest.java
@@ -22,6 +22,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -75,6 +76,24 @@ public class TopicSubscriptionZeroPrefetchTest {
         Assert.assertNotNull("should have received a message the published message", consumedMessage);
     }
 
+    @Test(timeout=60000)
+    public void testTopicConsumerPrefetchZeroClientAckLoop() throws Exception {
+        ActiveMQTopic consumerDestination = new ActiveMQTopic(TOPIC_NAME + "?consumer.retroactive=true&consumer.prefetchSize=0");
+        Session consumerClientAckSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        consumer = consumerClientAckSession.createConsumer(consumerDestination);
+
+        final int count = 10;
+        for (int i=0;i<count;i++) {
+            Message txtMessage = session.createTextMessage("M:"+ i);
+            producer.send(txtMessage);
+        }
+
+        for (int i=0;i<count;i++) {
+            Message consumedMessage = consumer.receive(2000);
+            Assert.assertNotNull("should have received message[" + i +"]", consumedMessage);
+        }
+    }
+
     /*
      * test durable topic subscription with prefetch zero
      */


[04/14] activemq git commit: Allow the AMQP test client to also be configure to trace frames

Posted by ta...@apache.org.
Allow the AMQP test client to also be configure to trace frames

The test client can allow for quick tracing of the frame traffic via a
call to setTraceFrames on the client or connection instance before
connection to the remote.  This allows for tests to easily switch on /
off tracing.  The log4j.properties is also updated to output frame
tracing with the URI option is put on the AMQP transport or the client
value is enabled.  
Conflicts:
	activemq-amqp/src/test/resources/log4j.properties


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a5a4262b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a5a4262b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a5a4262b

Branch: refs/heads/activemq-5.14.x
Commit: a5a4262b54e979a4735fe3fd3a598dd5e0c7ccb6
Parents: b77b3c4
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 8 16:19:37 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:14:08 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpClient.java       | 22 +++++++++++++-
 .../transport/amqp/client/AmqpConnection.java   | 32 +++++++++++++++++++-
 .../amqp/interop/AmqpAnonymousSenderTest.java   |  2 ++
 .../src/test/resources/log4j.properties         |  5 +--
 4 files changed, 57 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a5a4262b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 78b1aa0..8c794ea 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -21,8 +21,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
 import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +40,7 @@ public class AmqpClient {
     private final URI remoteURI;
     private String authzid;
     private String mechanismRestriction;
+    private boolean traceFrames;
 
     private AmqpValidator stateInspector = new AmqpValidator();
     private List<Symbol> offeredCapabilities = Collections.emptyList();
@@ -103,6 +104,7 @@ public class AmqpClient {
         connection.setOfferedCapabilities(getOfferedCapabilities());
         connection.setOfferedProperties(getOfferedProperties());
         connection.setStateInspector(getStateInspector());
+        connection.setTraceFrames(isTraceFrames());
 
         return connection;
     }
@@ -218,6 +220,24 @@ public class AmqpClient {
         this.stateInspector = stateInspector;
     }
 
+    /**
+     * @return the traceFrames setting for the client, true indicates frame tracing is on.
+     */
+    public boolean isTraceFrames() {
+        return traceFrames;
+    }
+
+    /**
+     * Controls whether connections created from this client object will log AMQP
+     * frames to a trace level logger or not.
+     *
+     * @param traceFrames
+     *      configure the trace frames option for the client created connections.
+     */
+    public void setTraceFrames(boolean traceFrames) {
+        this.traceFrames = traceFrames;
+    }
+
     @Override
     public String toString() {
         return "AmqpClient: " + getRemoteURI().getHost() + ":" + getRemoteURI().getPort();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5a4262b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 85a1d22..1c795a8 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -49,6 +49,9 @@ import org.apache.qpid.proton.engine.Event.Type;
 import org.apache.qpid.proton.engine.Sasl;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
+import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,6 +62,7 @@ import io.netty.util.ReferenceCountUtil;
 public class AmqpConnection extends AmqpAbstractResource<Connection> implements NettyTransportListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
+    private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpConnection.class.getPackage().getName() + ".FRAMES");
 
     private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult();
 
@@ -101,6 +105,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private long connectTimeout = DEFAULT_CONNECT_TIMEOUT;
     private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
     private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
+    private boolean trace;
 
     public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
         setEndpoint(Connection.Factory.create());
@@ -154,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                         sasl.client();
                     }
                     authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction);
+                    updateTracer();
                     open(future);
 
                     pumpToProtonTransport(future);
@@ -434,6 +440,14 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         return mechanismRestriction;
     }
 
+    public boolean isTraceFrames() {
+        return trace;
+    }
+
+    public void setTraceFrames(boolean trace) {
+        this.trace = trace;
+    }
+
     //----- Internal getters used from the child AmqpResource classes --------//
 
     ScheduledExecutorService getScheduler() {
@@ -698,6 +712,22 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
         return containerId;
     }
 
+    private void updateTracer() {
+        if (isTraceFrames()) {
+            ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
+                @Override
+                public void receivedFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | RECV: {}", getRemoteURI(), transportFrame.getBody());
+                }
+
+                @Override
+                public void sentFrame(TransportFrame transportFrame) {
+                    TRACE_FRAMES.trace("{} | SENT: {}", this, transportFrame.getBody());
+                }
+            });
+        }
+    }
+
     @Override
     public String toString() {
         return "AmqpConnection { " + connectionId + " }";

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5a4262b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
index f7c4356..efba381 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
@@ -39,6 +39,8 @@ public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {
     public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
 
         AmqpClient client = createAmqpClient();
+        client.setTraceFrames(false);
+
         AmqpConnection connection = client.connect();
         AmqpSession session = connection.createSession();
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a5a4262b/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index f88b152..64d4fb0 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,8 +20,9 @@
 #
 log4j.rootLogger=WARN, console, file
 log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=TRACE
-log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
+log4j.logger.org.apache.activemq.transport.amqp=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp.client.FRAMES=TRACE
+log4j.logger.org.apache.activemq.transport.amqp.FRAMES=TRACE
 log4j.logger.org.fusesource=INFO
 
 # Configure various level of detail for Qpid JMS logs.


[07/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422

Adds a split consumer test that uses presettled receivers.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/14c5c527
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/14c5c527
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/14c5c527

Branch: refs/heads/activemq-5.14.x
Commit: 14c5c5276c9f6bfa360c24d4e2b875483dc43888
Parents: 566e826
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Sep 9 18:34:03 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:14:55 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 79 ++++++++++++++++++++
 1 file changed, 79 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/14c5c527/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index f39fc3e..3132e6e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -615,4 +615,83 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         receiver.close();
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testTwoPresettledReceiversReceiveAllMessages() throws Exception {
+        final int MSG_COUNT = 100;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        final String address = "queue://" + getTestName();
+
+        AmqpSender sender = session.createSender(address);
+        AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
+        AmqpReceiver receiver2 = session.createReceiver(address, null, false, true);
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+
+        final DestinationViewMBean destinationView = getProxyToQueue(getTestName());
+
+        LOG.info("Attempting to read first two messages with receiver #1");
+        receiver1.flow(2);
+        AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 1", message1);
+        assertNotNull("Should have read message 2", message2);
+        assertEquals("msg0", message1.getMessageId());
+        assertEquals("msg1", message2.getMessageId());
+        message1.accept();
+        message2.accept();
+
+        LOG.info("Attempting to read next two messages with receiver #2");
+        receiver2.flow(2);
+        AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
+        AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message 3", message3);
+        assertNotNull("Should have read message 4", message4);
+        assertEquals("msg2", message3.getMessageId());
+        assertEquals("msg3", message4.getMessageId());
+        message3.accept();
+        message4.accept();
+
+        assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destinationView.getInFlightCount() == 0;
+            }
+        }));
+
+        LOG.info("*** Attempting to read remaining messages with both receivers");
+        int splitCredit = (MSG_COUNT - 4) / 2;
+
+        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
+        receiver1.flow(splitCredit);
+        for (int i = 0; i < splitCredit; i++) {
+            AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Receiver #1 should have read a message", message);
+            LOG.info("Receiver #1 read message: {}", message.getMessageId());
+            message.accept();
+        }
+
+        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
+        receiver2.flow(splitCredit);
+        for (int i = 0; i < splitCredit; i++) {
+            AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
+            assertNotNull("Receiver #2 should have read a message", message);
+            LOG.info("Receiver #2 read message: {}", message.getMessageId());
+            message.accept();
+        }
+
+        receiver1.close();
+        receiver2.close();
+
+        connection.close();
+    }
 }


[14/14] activemq git commit: NO-JIRA: Adding an extra test on AmqpTransactionTest

Posted by ta...@apache.org.
NO-JIRA: Adding an extra test on AmqpTransactionTest

The test I'm adding was back ported from Artemis.
It will validate if the ACKs are nacked in case of a connection.close();
To avoid a situation where the TX would sit on a Transaction Resource Manager somewhere like an XID.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/195046c5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/195046c5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/195046c5

Branch: refs/heads/activemq-5.14.x
Commit: 195046c50359667fc6d9480cf13c3d9e1d123d66
Parents: 8e6fe41
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Sep 21 16:12:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:16:16 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpTransactionTest.java       | 45 ++++++++++++++++++++
 1 file changed, 45 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/195046c5/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 994a2e7..0815f8a 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -151,6 +151,51 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReceiveAfterConnectionClose() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender(getTestName());
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+        sender.send(message);
+
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpReceiver receiver = session.createReceiver(getTestName());
+
+        session.begin();
+
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        // this will force a rollback on the TX (It should at least)
+        connection.close();
+
+        connection = client.connect();
+        session = connection.createSession();
+        receiver = session.createReceiver(getTestName());
+        session.begin();
+        receiver.flow(1);
+
+        received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        session.commit();
+
+        assertEquals(0, queue.getQueueSize());
+
+        connection.close();
+    }
+
+
+    @Test(timeout = 60000)
     public void testReceiveMessageWithRollback() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();


[06/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422

Small fix to test and check for zero inflight on successive send to
destination that should have no credit on the registered receiver.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/566e8261
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/566e8261
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/566e8261

Branch: refs/heads/activemq-5.14.x
Commit: 566e82614aa3cb31c80d44d155debe0e63cc2a3c
Parents: ca11674
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Sep 9 13:02:04 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:14:43 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 26 +++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/566e8261/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 752c341..f39fc3e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.Queue;
 import javax.jms.Topic;
 
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.junit.ActiveMQTestRunner;
 import org.apache.activemq.junit.Repeat;
@@ -43,6 +44,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -569,12 +571,18 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         AmqpReceiver receiver = session.createReceiver(address);
         AmqpSender sender = session.createSender(address);
 
+        final DestinationViewMBean destinationView;
+        if (Queue.class.equals(destType)) {
+            destinationView = getProxyToQueue(getTestName());
+        } else {
+            destinationView = getProxyToTopic(getTestName());
+        }
+
         for (int i = 0; i < MSG_COUNT; i++) {
             AmqpMessage message = new AmqpMessage();
             message.setMessageId("msg" + i);
             sender.send(message);
         }
-        sender.close();
 
         List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
 
@@ -582,12 +590,28 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
             receiver.flow(1);
             AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
             assertNotNull(received);
+            pendingAcks.add(received);
         }
 
+        // Send one more to check in-flight stays at zero with no credit and all
+        // pending messages settled.
+        AmqpMessage message = new AmqpMessage();
+        message.setMessageId("msg-final");
+        sender.send(message);
+
         for (AmqpMessage pendingAck : pendingAcks) {
             pendingAck.accept();
         }
 
+        assertTrue("Should be no inflight messages: " + destinationView.getInFlightCount(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return destinationView.getInFlightCount() == 0;
+            }
+        }));
+
+        sender.close();
         receiver.close();
         connection.close();
     }


[05/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422

Add test for credit grants but no settles for a single receiver.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ca11674f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ca11674f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ca11674f

Branch: refs/heads/activemq-5.14.x
Commit: ca11674f37cf3a67a9215f341a8e8458ce7b0641
Parents: a5a4262
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Sep 9 12:52:48 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:14:31 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 53 ++++++++++++++++++++
 1 file changed, 53 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ca11674f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index 8a4958f..752c341 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -22,7 +22,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -538,4 +540,55 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageBeyondAckedAmountQueue() throws Exception {
+        doTestReceiveMessageBeyondAckedAmount(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageBeyondAckedAmountTopic() throws Exception {
+        doTestReceiveMessageBeyondAckedAmount(Topic.class);
+    }
+
+    private void doTestReceiveMessageBeyondAckedAmount(Class<?> destType) throws Exception {
+        final int MSG_COUNT = 50;
+
+        AmqpClient client = createAmqpClient();
+
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        AmqpReceiver receiver = session.createReceiver(address);
+        AmqpSender sender = session.createSender(address);
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+        sender.close();
+
+        List<AmqpMessage> pendingAcks = new ArrayList<AmqpMessage>();
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            receiver.flow(1);
+            AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(received);
+        }
+
+        for (AmqpMessage pendingAck : pendingAcks) {
+            pendingAck.accept();
+        }
+
+        receiver.close();
+        connection.close();
+    }
 }


[03/14] activemq git commit: Add a test case for anonymous sender links using simple test client.

Posted by ta...@apache.org.
Add a test case for anonymous sender links using simple test client.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b77b3c42
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b77b3c42
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b77b3c42

Branch: refs/heads/activemq-5.14.x
Commit: b77b3c428b7fa842c62960215b0457de7595da1c
Parents: cc0e787
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 7 17:27:31 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:13:04 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      | 25 +++++++
 .../transport/amqp/client/AmqpSession.java      | 11 +++
 .../amqp/interop/AmqpAnonymousSenderTest.java   | 71 ++++++++++++++++++++
 3 files changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b77b3c42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index b954e04..99f4cfb 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -173,6 +173,31 @@ public class AmqpMessage {
     //----- Convenience methods for constructing outbound messages -----------//
 
     /**
+     * Sets the address which is applied to the AMQP message To field in the message properties
+     *
+     * @param address
+     *      The address that should be applied in the Message To field.
+     */
+    public void setAddress(String address) {
+        checkReadOnly();
+        lazyCreateProperties();
+        getWrappedMessage().setAddress(address);
+    }
+
+    /**
+     * Return the set address that was set in the Message To field.
+     *
+     * @return the set address String form or null if not set.
+     */
+    public String getAddress() {
+        if (message.getProperties() == null) {
+            return null;
+        }
+
+        return message.getProperties().getTo();
+    }
+
+    /**
      * Sets the MessageId property on an outbound message using the provided String
      *
      * @param messageId

http://git-wip-us.apache.org/repos/asf/activemq/blob/b77b3c42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 6ed7861..ae99f65 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -54,6 +54,17 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
     }
 
     /**
+     * Create an anonymous sender.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the sender.
+     */
+    public AmqpSender createSender() throws Exception {
+        return createSender(null, false);
+    }
+
+    /**
      * Create a sender instance using the given address
      *
      * @param address

http://git-wip-us.apache.org/repos/asf/activemq/blob/b77b3c42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
new file mode 100644
index 0000000..f7c4356
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpAnonymousSenderTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test for support of Anonymous sender links.
+ */
+public class AmqpAnonymousSenderTest extends AmqpClientTestSupport {
+
+    @Test(timeout = 60000)
+    public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender();
+
+        assertEquals(1, getProxyToBroker().getDynamicDestinationProducers().length);
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setAddress("queue://" + getTestName());
+        message.setMessageId("msg" + 1);
+        message.setMessageAnnotation("serialNo", 1);
+        message.setText("Test-Message");
+
+        sender.send(message);
+        sender.close();
+
+        LOG.info("Attempting to read message with receiver");
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message", received);
+        assertEquals("msg1", received.getMessageId());
+        received.accept();
+
+        receiver.close();
+
+        connection.close();
+    }
+}


[02/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422

I've made a few minor test changes and added a couple more cases.  Under
heavy CPU load I'm able to get test,
testReceiveMessageAndRefillCreditBeforeAcceptOnQueue to fail on the
second receive call where it should get the second message since it
granted credit.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc0e7879
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc0e7879
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc0e7879

Branch: refs/heads/activemq-5.14.x
Commit: cc0e78790fe1c1d68a727d58fdf1b7cdad22b598
Parents: 94ffb1b
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 7 14:05:21 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:12:43 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpSendReceiveTest.java       | 284 ++++++++++++-------
 1 file changed, 175 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc0e7879/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
index c27c0f9..8a4958f 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.Queue;
+import javax.jms.Topic;
+
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.junit.ActiveMQTestRunner;
 import org.apache.activemq.junit.Repeat;
@@ -44,7 +47,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ * Test basic send and receive scenarios using only AMQP sender and receiver
+ * links.
  */
 @RunWith(ActiveMQTestRunner.class)
 public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@@ -52,6 +56,37 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
 
     @Test(timeout = 60000)
+    public void testSimpleSendOneReceiveOne() throws Exception {
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setMessageId("msg" + 1);
+        message.setMessageAnnotation("serialNo", 1);
+        message.setText("Test-Message");
+
+        sender.send(message);
+        sender.close();
+
+        LOG.info("Attempting to read message with receiver");
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull("Should have read message", received);
+        assertEquals("msg1", received.getMessageId());
+        received.accept();
+
+        receiver.close();
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
     public void testCloseBusyReceiver() throws Exception {
         final int MSG_COUNT = 20;
 
@@ -96,108 +131,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
-    public void testReceiveFlowDispositionSingleCredit() throws Exception {
-        AmqpClient client = createAmqpClient();
-        AmqpConnection connection = client.connect();
-        AmqpSession session = connection.createSession();
-
-        AmqpSender sender = session.createSender("queue://" + getTestName());
-        for (int i=0;i<2; i++) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("msg" + i);
-            sender.send(message);
-        }
-        sender.close();
-        connection.close();
-
-        LOG.info("Starting consumer connection");
-        connection = client.connect();
-        session = connection.createSession();
-        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
-        receiver.flow(1);
-        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
-        assertNotNull(received);
-
-        receiver.flow(1);
-        received.accept();
-
-        received = receiver.receive(5, TimeUnit.SECONDS);
-        assertNotNull(received);
-        received.accept();
-
-        receiver.close();
-        connection.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testReceiveFlowDispositionSingleCreditTopic() throws Exception {
-        final AmqpClient client = createAmqpClient();
-        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
-        final CountDownLatch receiverReady = new CountDownLatch(1);
-        ExecutorService executorService = Executors.newCachedThreadPool();
-
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    LOG.info("Starting consumer connection");
-                    AmqpConnection connection = client.connect();
-                    AmqpSession session = connection.createSession();
-                    AmqpReceiver receiver = session.createReceiver("topic://" + getTestName());
-                    receiver.flow(1);
-                    receiverReady.countDown();
-                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
-                    assertNotNull(received);
-
-                    receiver.flow(1);
-                    received.accept();
-
-                    received = receiver.receive(5, TimeUnit.SECONDS);
-                    assertNotNull(received);
-                    received.accept();
-
-                    receiver.close();
-                    connection.close();
-
-                } catch (Exception error) {
-                    errors.add(error);
-                }
-
-            }
-        });
-
-        // producer
-        executorService.submit(new Runnable() {
-            @Override
-            public void run() {
-                try {
-
-                    receiverReady.await(20, TimeUnit.SECONDS);
-                    AmqpConnection connection = client.connect();
-                    AmqpSession session = connection.createSession();
-
-                    AmqpSender sender = session.createSender("topic://" + getTestName());
-                    for (int i = 0; i < 2; i++) {
-                        AmqpMessage message = new AmqpMessage();
-                        message.setMessageId("msg" + i);
-                        sender.send(message);
-                    }
-                    sender.close();
-                    connection.close();
-                } catch (Exception ignored) {
-                    ignored.printStackTrace();
-                }
-
-            }
-        });
-
-        executorService.shutdown();
-        executorService.awaitTermination(20, TimeUnit.SECONDS);
-        assertTrue("no errors: " + errors, errors.isEmpty());
-    }
-
-
-    @Test(timeout = 60000)
     public void testReceiveWithJMSSelectorFilter() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();
@@ -279,7 +212,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
 
         LOG.info("Attempting to read remaining messages with receiver #1");
         receiver1.flow(MSG_COUNT - 4);
-        for (int i = 4; i < MSG_COUNT - 4; i++) {
+        for (int i = 4; i < MSG_COUNT; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
             assertNotNull("Should have read a message", message);
             assertEquals("msg" + i, message.getMessageId());
@@ -341,20 +274,24 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
         message3.accept();
         message4.accept();
 
-        LOG.info("Attempting to read remaining messages with both receivers");
+        LOG.info("*** Attempting to read remaining messages with both receivers");
         int splitCredit = (MSG_COUNT - 4) / 2;
 
+        LOG.info("**** Receiver #1 granting creadit[{}] for its block of messages", splitCredit);
         receiver1.flow(splitCredit);
-        for (int i = 4; i < splitCredit; i++) {
+        for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver1.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Should have read a message", message);
+            assertNotNull("Receiver #1 should have read a message", message);
+            LOG.info("Receiver #1 read message: {}", message.getMessageId());
             message.accept();
         }
 
+        LOG.info("**** Receiver #2 granting creadit[{}] for its block of messages", splitCredit);
         receiver2.flow(splitCredit);
-        for (int i = 4; i < splitCredit; i++) {
+        for (int i = 0; i < splitCredit; i++) {
             AmqpMessage message = receiver2.receive(10, TimeUnit.SECONDS);
-            assertNotNull("Should have read a message", message);
+            assertNotNull("Receiver #2 should have read a message", message);
+            LOG.info("Receiver #2 read message: {}", message.getMessageId());
             message.accept();
         }
 
@@ -365,6 +302,135 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
     }
 
     @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueue() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAccept(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopic() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAccept(Topic.class);
+    }
+
+    private void doTestReceiveMessageAndRefillCreditBeforeAccept(Class<?> destType) throws Exception {
+
+        AmqpClient client = createAmqpClient();
+
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        AmqpReceiver receiver = session.createReceiver(address);
+        AmqpSender sender = session.createSender(address);
+
+        for (int i = 0; i < 2; i++) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("msg" + i);
+            sender.send(message);
+        }
+        sender.close();
+
+        receiver.flow(1);
+        AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+        assertNotNull(received);
+
+        receiver.flow(1);
+        received.accept();
+
+        received = receiver.receive(10, TimeUnit.SECONDS);
+        assertNotNull(received);
+        received.accept();
+
+        receiver.close();
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnQueueAsync() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Queue.class);
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync() throws Exception {
+        doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Topic.class);
+    }
+
+    private void doTestReceiveMessageAndRefillCreditBeforeAcceptOnTopicAsync(Class<?> destType) throws Exception {
+        final AmqpClient client = createAmqpClient();
+        final LinkedList<Throwable> errors = new LinkedList<Throwable>();
+        final CountDownLatch receiverReady = new CountDownLatch(1);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+
+        final String address;
+        if (Queue.class.equals(destType)) {
+            address = "queue://" + getTestName();
+        } else {
+            address = "topic://" + getTestName();
+        }
+
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Starting consumer connection");
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+                    AmqpReceiver receiver = session.createReceiver(address);
+                    receiver.flow(1);
+                    receiverReady.countDown();
+                    AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+
+                    receiver.flow(1);
+                    received.accept();
+
+                    received = receiver.receive(5, TimeUnit.SECONDS);
+                    assertNotNull(received);
+                    received.accept();
+
+                    receiver.close();
+                    connection.close();
+
+                } catch (Exception error) {
+                    errors.add(error);
+                }
+            }
+        });
+
+        // producer
+        executorService.submit(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    receiverReady.await(20, TimeUnit.SECONDS);
+                    AmqpConnection connection = client.connect();
+                    AmqpSession session = connection.createSession();
+
+                    AmqpSender sender = session.createSender(address);
+                    for (int i = 0; i < 2; i++) {
+                        AmqpMessage message = new AmqpMessage();
+                        message.setMessageId("msg" + i);
+                        sender.send(message);
+                    }
+                    sender.close();
+                    connection.close();
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        });
+
+        executorService.shutdown();
+        executorService.awaitTermination(20, TimeUnit.SECONDS);
+        assertTrue("no errors: " + errors, errors.isEmpty());
+    }
+
+    @Test(timeout = 60000)
     public void testMessageDurabliltyFollowsSpec() throws Exception {
         AmqpClient client = createAmqpClient();
         AmqpConnection connection = client.connect();


[08/14] activemq git commit: NO-JIRA AMQP Test updates

Posted by ta...@apache.org.
NO-JIRA AMQP Test updates

Adds support for doing sends and receives that are enrolled in a
transaction created in a session other than the session that created the
sender or receiver.  Adds some tests that show this in action.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fa551498
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fa551498
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fa551498

Branch: refs/heads/activemq-5.14.x
Commit: fa5514985d74ff2d0bfd7faebb28452a2a7eed2e
Parents: 14c5c52
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 14 18:23:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:06 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  19 +-
 .../transport/amqp/client/AmqpReceiver.java     |  26 +++
 .../transport/amqp/client/AmqpSender.java       |  29 ++-
 .../transport/amqp/client/AmqpSession.java      |   8 +-
 .../amqp/interop/AmqpTransactionTest.java       | 194 +++++++++++++++++++
 5 files changed, 269 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 99f4cfb..8b378e1 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -140,6 +140,22 @@ public class AmqpMessage {
     }
 
     /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param session
+     *      The session that is used to manage acceptance of the message.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(AmqpSession txnSession) throws Exception {
+        if (receiver == null) {
+            throw new IllegalStateException("Can't accept non-received message.");
+        }
+
+        receiver.accept(delivery, txnSession);
+    }
+
+    /**
      * Marks the message as Modified, indicating whether it failed to deliver and is not deliverable here.
      *
      * @param deliveryFailed
@@ -374,7 +390,7 @@ public class AmqpMessage {
      * @param key
      *        the name used to lookup the property in the application properties.
      *
-     * @return the propety value or null if not set.
+     * @return the property value or null if not set.
      */
     public Object getApplicationProperty(String key) {
         if (applicationPropertiesMap == null) {
@@ -560,6 +576,7 @@ public class AmqpMessage {
             message.setHeader(new Header());
         }
     }
+
     private void lazyCreateProperties() {
         if (message.getProperties() == null) {
             message.setProperties(new Properties());

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 77a529d..999e033 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -422,12 +422,38 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
      * @throws IOException if an error occurs while sending the accept.
      */
     public void accept(final Delivery delivery) throws IOException {
+        accept(delivery, this.session);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * This method allows for the session that is used in the accept to be specified by the
+     * caller.  This allows for an accepted message to be involved in a transaction that is
+     * being managed by some other session other than the one that created this receiver.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param session
+     *        the session under which the message is being accepted.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
         checkClosed();
 
         if (delivery == null) {
             throw new IllegalArgumentException("Delivery to accept cannot be null");
         }
 
+        if (session == null) {
+            throw new IllegalArgumentException("Session given cannot be null");
+        }
+
+        if (session.getConnection() != this.session.getConnection()) {
+            throw new IllegalArgumentException("The session used for accept must originate from the connection that created this receiver.");
+        }
+
         final ClientFuture request = new ClientFuture();
         session.getScheduler().execute(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index f9d6435..dd3a371 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -127,6 +127,21 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
      */
     public void send(final AmqpMessage message) throws IOException {
         checkClosed();
+        send(message, null);
+    }
+
+    /**
+     * Sends the given message to this senders assigned address using the supplied transaction ID.
+     *
+     * @param message
+     *        the message to send.
+     * @param txId
+     *        the transaction ID to assign the outgoing send.
+     *
+     * @throws IOException if an error occurs during the send.
+     */
+    public void send(final AmqpMessage message, final AmqpTransactionId txId) throws IOException {
+        checkClosed();
         final ClientFuture sendRequest = new ClientFuture();
 
         session.getScheduler().execute(new Runnable() {
@@ -134,7 +149,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             @Override
             public void run() {
                 try {
-                    doSend(message, sendRequest);
+                    doSend(message, sendRequest, txId);
                     session.pumpToProtonTransport(sendRequest);
                 } catch (Exception e) {
                     sendRequest.onFailure(e);
@@ -319,7 +334,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
         }
     }
 
-    private void doSend(AmqpMessage message, AsyncResult request) throws Exception {
+    private void doSend(AmqpMessage message, AsyncResult request, AmqpTransactionId txId) throws Exception {
         LOG.trace("Producer sending message: {}", message);
 
         Delivery delivery = null;
@@ -332,8 +347,14 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
 
         delivery.setContext(request);
 
-        if (session.isInTransaction()) {
-            Binary amqpTxId = session.getTransactionId().getRemoteTxId();
+        Binary amqpTxId = null;
+        if (txId != null) {
+            amqpTxId = txId.getRemoteTxId();
+        } else if (session.isInTransaction()) {
+            amqpTxId = session.getTransactionId().getRemoteTxId();
+        }
+
+        if (amqpTxId != null) {
             TransactionalState state = new TransactionalState();
             state.setTxnId(amqpTxId);
             delivery.disposition(state);

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index ae99f65..3804603 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -464,8 +464,12 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
         connection.pumpToProtonTransport(request);
     }
 
-    AmqpTransactionId getTransactionId() {
-        return txContext.getTransactionId();
+    public AmqpTransactionId getTransactionId() {
+        if (txContext != null && txContext.isInTransaction()) {
+            return txContext.getTransactionId();
+        }
+
+        return null;
     }
 
     AmqpTransactionContext getTransactionContext() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa551498/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index a998290..97089a9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -178,4 +178,198 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
         sender.close();
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionReceiversInSingleTXNWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Load up the Queue with some messages
+        {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender("queue://" + getTestName());
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message);
+            sender.send(message);
+            sender.send(message);
+            sender.close();
+        }
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(3, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        assertTrue(txnSession.isInTransaction());
+
+        receiver1.flow(1);
+        receiver2.flow(1);
+        receiver3.flow(1);
+
+        AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+        message1.accept(txnSession);
+        message2.accept(txnSession);
+        message3.accept(txnSession);
+
+        assertEquals(3, queue.getQueueSize());
+
+        txnSession.commit();
+
+        assertEquals(0, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionReceiversInSingleTXNWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Load up the Queue with some messages
+        {
+            AmqpSession session = connection.createSession();
+            AmqpSender sender = session.createSender("queue://" + getTestName());
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message);
+            sender.send(message);
+            sender.send(message);
+            sender.close();
+        }
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpReceiver receiver1 = session1.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver2 = session2.createReceiver("queue://" + getTestName());
+        AmqpReceiver receiver3 = session3.createReceiver("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(3, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        assertTrue(txnSession.isInTransaction());
+
+        receiver1.flow(1);
+        receiver2.flow(1);
+        receiver3.flow(1);
+
+        AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message3 = receiver3.receive(5, TimeUnit.SECONDS);
+
+        message1.accept(txnSession);
+        message2.accept(txnSession);
+        message3.accept(txnSession);
+
+        assertEquals(3, queue.getQueueSize());
+
+        txnSession.rollback();
+
+        assertEquals(3, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionSendersInSingleTXNWithCommit() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+        AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+        AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(0, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+
+        assertTrue(txnSession.isInTransaction());
+
+        sender1.send(message, txnSession.getTransactionId());
+        sender2.send(message, txnSession.getTransactionId());
+        sender3.send(message, txnSession.getTransactionId());
+
+        assertEquals(0, queue.getQueueSize());
+
+        txnSession.commit();
+
+        assertEquals(3, queue.getQueueSize());
+    }
+
+    @Test(timeout = 60000)
+    public void testMultipleSessionSendersInSingleTXNWithRollback() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Create some sender sessions
+        AmqpSession session1 = connection.createSession();
+        AmqpSession session2 = connection.createSession();
+        AmqpSession session3 = connection.createSession();
+
+        // Sender linked to each session
+        AmqpSender sender1 = session1.createSender("queue://" + getTestName());
+        AmqpSender sender2 = session2.createSender("queue://" + getTestName());
+        AmqpSender sender3 = session3.createSender("queue://" + getTestName());
+
+        final QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(0, queue.getQueueSize());
+
+        // Begin the transaction that all senders will operate in.
+        txnSession.begin();
+
+        AmqpMessage message = new AmqpMessage();
+        message.setText("Test-Message");
+
+        assertTrue(txnSession.isInTransaction());
+
+        sender1.send(message, txnSession.getTransactionId());
+        sender2.send(message, txnSession.getTransactionId());
+        sender3.send(message, txnSession.getTransactionId());
+
+        assertEquals(0, queue.getQueueSize());
+
+        txnSession.rollback();
+
+        assertEquals(0, queue.getQueueSize());
+    }
 }


[10/14] activemq git commit: NO-JIRA: Small test client fix to close threads out faster.

Posted by ta...@apache.org.
NO-JIRA: Small test client fix to close threads out faster.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/92116612
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/92116612
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/92116612

Branch: refs/heads/activemq-5.14.x
Commit: 92116612446eefee2420c19d761153977dda00fb
Parents: 12ee866
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 15 16:28:16 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:30 2016 -0400

----------------------------------------------------------------------
 .../activemq/transport/amqp/client/AmqpConnection.java       | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/92116612/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 1c795a8..3328044 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -226,7 +226,13 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
                     }
                 }
 
-                serializer.shutdown();
+                serializer.shutdownNow();
+                try {
+                    if (!serializer.awaitTermination(10, TimeUnit.SECONDS)) {
+                        LOG.warn("Serializer didn't shutdown cleanly");
+                    }
+                } catch (InterruptedException e) {
+                }
             }
         }
     }


[11/14] activemq git commit: NO-JIRA: Add some more variants of the .NET transaction tests

Posted by ta...@apache.org.
NO-JIRA: Add some more variants of the .NET transaction tests

Adds ability to not settle accepted messages on the client to enable
creation of tests that are equivalent to the AmqpNetLite client's
transaction tests which hold settlement and expect the resource to
handle it on successful discharge.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0bb76c7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0bb76c7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0bb76c7f

Branch: refs/heads/activemq-5.14.x
Commit: 0bb76c7fb42d49c50e69265e1c97c463f5fdbc58
Parents: 9211661
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 19 17:36:58 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:40 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      |  31 +++-
 .../transport/amqp/client/AmqpReceiver.java     |  44 +++++-
 .../amqp/interop/AmqpTransactionTest.java       | 153 +++++++++++++++++++
 3 files changed, 221 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 8b378e1..2b1b874 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -132,15 +132,28 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept() throws Exception {
+        accept(true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param settle
+     *      true if the client should also settle the delivery when sending the accept.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(boolean settle) throws Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received message.");
         }
 
-        receiver.accept(delivery);
+        receiver.accept(delivery, settle);
     }
 
     /**
-     * Accepts the message marking it as consumed on the remote peer.
+     * Accepts the message marking it as consumed on the remote peer.  This method
+     * will automatically settle the accepted delivery.
      *
      * @param session
      *      The session that is used to manage acceptance of the message.
@@ -148,11 +161,23 @@ public class AmqpMessage {
      * @throws Exception if an error occurs during the accept.
      */
     public void accept(AmqpSession txnSession) throws Exception {
+        accept(txnSession, true);
+    }
+
+    /**
+     * Accepts the message marking it as consumed on the remote peer.
+     *
+     * @param session
+     *      The session that is used to manage acceptance of the message.
+     *
+     * @throws Exception if an error occurs during the accept.
+     */
+    public void accept(AmqpSession txnSession, boolean settle) throws Exception {
         if (receiver == null) {
             throw new IllegalStateException("Can't accept non-received message.");
         }
 
-        receiver.accept(delivery, txnSession);
+        receiver.accept(delivery, txnSession, settle);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 999e033..3543ae3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -414,20 +414,34 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     }
 
     /**
-     * Accepts a message that was dispatched under the given Delivery instance.
+     * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
      *
      * @param delivery
      *        the Delivery instance to accept.
      *
      * @throws IOException if an error occurs while sending the accept.
      */
-    public void accept(final Delivery delivery) throws IOException {
-        accept(delivery, this.session);
+    public void accept(Delivery delivery) throws IOException {
+        accept(delivery, this.session, true);
     }
 
     /**
      * Accepts a message that was dispatched under the given Delivery instance.
      *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(Delivery delivery, boolean settle) throws IOException {
+        accept(delivery, this.session, settle);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance and settles the delivery.
+     *
      * This method allows for the session that is used in the accept to be specified by the
      * caller.  This allows for an accepted message to be involved in a transaction that is
      * being managed by some other session other than the one that created this receiver.
@@ -440,6 +454,26 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
      * @throws IOException if an error occurs while sending the accept.
      */
     public void accept(final Delivery delivery, final AmqpSession session) throws IOException {
+        accept(delivery, session, true);
+    }
+
+    /**
+     * Accepts a message that was dispatched under the given Delivery instance.
+     *
+     * This method allows for the session that is used in the accept to be specified by the
+     * caller.  This allows for an accepted message to be involved in a transaction that is
+     * being managed by some other session other than the one that created this receiver.
+     *
+     * @param delivery
+     *        the Delivery instance to accept.
+     * @param session
+     *        the session under which the message is being accepted.
+     * @param settle
+     *        true if the receiver should settle the delivery or just send the disposition.
+     *
+     * @throws IOException if an error occurs while sending the accept.
+     */
+    public void accept(final Delivery delivery, final AmqpSession session, final boolean settle) throws IOException {
         checkClosed();
 
         if (delivery == null) {
@@ -469,11 +503,13 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
                                 txState.setOutcome(Accepted.getInstance());
                                 txState.setTxnId(txnId);
                                 delivery.disposition(txState);
-                                delivery.settle();
                                 session.getTransactionContext().registerTxConsumer(AmqpReceiver.this);
                             }
                         } else {
                             delivery.disposition(Accepted.getInstance());
+                        }
+
+                        if (settle) {
                             delivery.settle();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/0bb76c7f/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 7cf6026..994a2e7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -574,4 +575,156 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         connection.close();
     }
+
+    // TODO - Direct ports of the AmqpNetLite client tests that don't currently with this broker.
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+        receiver.flow((NUM_MESSAGES + 2) * 2);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+        }
+
+        // Commit half the consumed messages
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.commit();
+
+        // Rollback the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession, false);
+        }
+        txnSession.rollback();
+
+        // After rollback message should still be acquired so we read last sent message.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        // Commit the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.commit();
+
+        // The final message should still be pending.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // We should have now drained the Queue
+        AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+        receiver.flow(1);
+        assertNull(message);
+
+        connection.close();
+    }
+
+    @Ignore("Fails due to no support for TX enrollment without settlement.")
+    @Test(timeout = 60000)
+    public void testCommitAndRollbackWithMultipleSessionsInSingleTXNoSettlement() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+        // Accept the first one in a TXN and send a new message in that TXN as well
+        txnSession.begin();
+        {
+            message1.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+        txnSession.begin();
+        {
+            message2.accept(txnSession, false);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        message2.release();
+
+        // Should be two message available for dispatch given that we sent and committed one, and
+        // releases another we had previously received.
+        receiver.flow(2);
+        for (int i = 1; i <= NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(i, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // Should be nothing left.
+        receiver.flow(1);
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
 }


[09/14] activemq git commit: NO-JIRA: Add some additional tests ported from the .NET AMQP client

Posted by ta...@apache.org.
NO-JIRA: Add some additional tests ported from the .NET AMQP client

Adds some transaction tests ported from AMQP .NET client with some
variances based on the way the test client works and limitations in the
brokers handling of Transacted sends.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/12ee866a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/12ee866a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/12ee866a

Branch: refs/heads/activemq-5.14.x
Commit: 12ee866a6e1b72b53fa7a1de4b300098c92375ae
Parents: fa55149
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 15 13:24:18 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:15:18 2016 -0400

----------------------------------------------------------------------
 .../amqp/interop/AmqpTransactionTest.java       | 202 +++++++++++++++++++
 1 file changed, 202 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/12ee866a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
index 97089a9..7cf6026 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpTransactionTest.java
@@ -18,8 +18,10 @@ package org.apache.activemq.transport.amqp.interop;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
@@ -372,4 +374,204 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
         assertEquals(0, queue.getQueueSize());
     }
+
+    //----- Tests Ported from AmqpNetLite client -----------------------------//
+
+    @Test(timeout = 60000)
+    public void testSendersCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 5;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        // Commit TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Rollback an additional batch of TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        // Commit more TXN work from a sender.
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(NUM_MESSAGES * 2);
+        for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept(txnSession);
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testReceiversCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
+        receiver.flow((NUM_MESSAGES + 2) * 2);
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            messages.add(message);
+        }
+
+        // Commit half the consumed messages
+        txnSession.begin();
+        for (int i = 0; i < NUM_MESSAGES / 2; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.commit();
+
+        // Rollback the other half the consumed messages
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            messages.get(i).accept(txnSession);
+        }
+        txnSession.rollback();
+
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        // Commit the other half the consumed messages
+        // This is a variation from the .NET client tests which doesn't settle the
+        // messages in the TX until commit is called but on ActiveMQ they will be
+        // redispatched regardless and not stay in the acquired state.
+        txnSession.begin();
+        for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            message.accept();
+        }
+        txnSession.commit();
+
+        // The final message should still be pending.
+        {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            receiver.flow(1);
+            assertNotNull(message);
+            assertEquals(NUM_MESSAGES, message.getApplicationProperty("msgId"));
+            message.release();
+        }
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCommitAndRollbackWithMultipleSessionsInSingleTX() throws Exception {
+        final int NUM_MESSAGES = 10;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+
+        // Root TXN session controls all TXN send lifetimes.
+        AmqpSession txnSession = connection.createSession();
+
+        // Normal Session which won't create an TXN itself
+        AmqpSession session = connection.createSession();
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < NUM_MESSAGES; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", i);
+            sender.send(message, txnSession.getTransactionId());
+        }
+
+        // Read all messages from the Queue, do not accept them yet.
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+        receiver.flow(2);
+        AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
+        AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
+
+        // Accept the first one in a TXN and send a new message in that TXN as well
+        txnSession.begin();
+        {
+            message1.accept(txnSession);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES);
+
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.commit();
+
+        // Accept the second one in a TXN and send a new message in that TXN as well but rollback
+        txnSession.begin();
+        {
+            message2.accept(txnSession);
+
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message");
+            message.setApplicationProperty("msgId", NUM_MESSAGES + 1);
+            sender.send(message, txnSession.getTransactionId());
+        }
+        txnSession.rollback();
+
+        // Variation here from .NET code, the client settles the accepted message where
+        // the .NET client does not and instead releases here to have it redelivered.
+
+        receiver.flow(NUM_MESSAGES);
+        for (int i = 1; i <= NUM_MESSAGES; ++i) {
+            AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            assertEquals(i, message.getApplicationProperty("msgId"));
+            message.accept();
+        }
+
+        // Should be nothing left.
+        assertNull(receiver.receive(1, TimeUnit.SECONDS));
+
+        connection.close();
+    }
 }


[13/14] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6422 - move delivery tracking to pumpoutbound and additional test that shows how the presettle case breaks. Thanks to Robbie Gemmell for the feedback

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6422 - move delivery tracking to pumpoutbound and additional test that shows how the presettle case breaks. Thanks to Robbie Gemmell for the feedback


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8e6fe414
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8e6fe414
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8e6fe414

Branch: refs/heads/activemq-5.14.x
Commit: 8e6fe414ad9078e765d3f8579dcd2898636b9ea1
Parents: ebbb7ab
Author: gtully <ga...@gmail.com>
Authored: Wed Sep 21 13:59:45 2016 +0100
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Sep 27 12:16:05 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpSender.java     |  2 +-
 .../amqp/interop/AmqpReceiverTest.java          | 41 ++++++++++++++++++++
 2 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6fe414/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 75f2371..455e0b0 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -292,7 +292,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         }
 
         pumpOutbound();
-        logicalDeliveryCount++;
     }
 
     @Override
@@ -410,6 +409,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                         }
                         currentBuffer = null;
                         currentDelivery = null;
+                        logicalDeliveryCount++;
                     }
                 } else {
                     return;

http://git-wip-us.apache.org/repos/asf/activemq/blob/8e6fe414/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index c68e850..b73f087 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -225,6 +225,47 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
 
     @Test(timeout = 60000)
     @Repeat(repetitions = 1)
+    public void testPresettledReceiverReadsAllMessagesInNonFlowBatch() throws Exception {
+        final int MSG_COUNT = 100;
+        sendMessages(getTestName(), MSG_COUNT, false);
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
+
+        QueueViewMBean queueView = getProxyToQueue(getTestName());
+        assertEquals(MSG_COUNT, queueView.getQueueSize());
+        assertEquals(0, queueView.getDispatchCount());
+
+        receiver.flow(20);
+        // consume less that flow
+        for (int j=0;j<10;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        // flow more and consume all
+        receiver.flow(10);
+        for (int j=0;j<20;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        // remainder
+        receiver.flow(70);
+        for (int j=0;j<70;j++) {
+            assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+        }
+
+        receiver.close();
+
+        assertEquals(0, queueView.getQueueSize());
+
+        connection.close();
+    }
+
+    @Test(timeout = 60000)
+    @Repeat(repetitions = 1)
     public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
         int MSG_COUNT = 4;
         sendMessages(getTestName(), MSG_COUNT, false);