You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/27 20:11:57 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5413 https://issues.apache.org/jira/browse/AMQ-5433 https://issues.apache.org/jira/browse/AMQ-5647 https://issues.apache.org/jira/browse/AMQ-5684

Repository: activemq
Updated Branches:
  refs/heads/master b58565019 -> 05ff52dc1


https://issues.apache.org/jira/browse/AMQ-5413
https://issues.apache.org/jira/browse/AMQ-5433
https://issues.apache.org/jira/browse/AMQ-5647
https://issues.apache.org/jira/browse/AMQ-5684

Adds support for AMQP drain and fixes some issues around incorrect
dispatching and credit handling.  Should resolve several issues that
have been seen using test suites from AmqpNetLite and other AMQP
clients.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/05ff52dc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/05ff52dc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/05ff52dc

Branch: refs/heads/master
Commit: 05ff52dc15ed1229972c0c6000b6b1aba789d2ad
Parents: b585650
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 27 15:11:38 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 27 15:11:38 2015 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeOutboundTransformer.java  |  5 ++
 .../transport/amqp/protocol/AmqpSender.java     | 50 ++++++++++++--------
 .../activemq/transport/amqp/IDERunner.java      | 27 +++++++----
 .../amqp/interop/AmqpReceiverTest.java          |  3 +-
 .../broker/region/PrefetchSubscription.java     | 23 ++++-----
 .../broker/region/TopicSubscription.java        | 20 ++++----
 .../apache/activemq/command/MessagePull.java    | 18 +++++++
 7 files changed, 95 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index 9d008bf..19573f9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -24,6 +24,7 @@ import javax.jms.Message;
 import javax.jms.MessageFormatException;
 
 import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.codec.CompositeWritableBuffer;
 import org.apache.qpid.proton.codec.DroppingWritableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
@@ -85,6 +86,10 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
                 // Update the DeliveryCount header...
                 // The AMQP delivery-count field only includes prior failed delivery attempts,
                 // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1.
+                if (amqp.getHeader() == null) {
+                    amqp.setHeader(new Header());
+                }
+
                 amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
 
                 // Re-encode...

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 75b6758..0096334 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -30,6 +30,7 @@ import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessagePull;
 import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
@@ -78,8 +79,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final boolean presettle;
 
     private boolean closed;
-    private boolean endOfBrowse;
     private int currentCredit;
+    private boolean draining;
     private long lastDeliveredSequenceId;
 
     private Buffer currentBuffer;
@@ -151,7 +152,31 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     public void flow() throws Exception {
         int updatedCredit = getEndpoint().getCredit();
 
-        if (updatedCredit != currentCredit) {
+        LOG.trace("Flow: drain={} credit={}, remoteCredit={}",
+                  getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit());
+
+        if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
+            currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
+            draining = true;
+
+            // Revert to a pull consumer.
+            ConsumerControl control = new ConsumerControl();
+            control.setConsumerId(getConsumerId());
+            control.setDestination(getDestination());
+            control.setPrefetch(0);
+            sendToActiveMQ(control, null);
+
+            // Now request dispatch of the drain amount, we request immediate
+            // timeout and an completion message regardless so that we can know
+            // when we should marked the link as drained.
+            MessagePull pullRequest = new MessagePull();
+            pullRequest.setConsumerId(getConsumerId());
+            pullRequest.setDestination(getDestination());
+            pullRequest.setTimeout(-1);
+            pullRequest.setAlwaysSignalDone(true);
+            pullRequest.setQuantity(currentCredit);
+            sendToActiveMQ(pullRequest, null);
+        } else if (updatedCredit != currentCredit) {
             currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
             ConsumerControl control = new ConsumerControl();
             control.setConsumerId(getConsumerId());
@@ -159,8 +184,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             control.setPrefetch(currentCredit);
             sendToActiveMQ(control, null);
         }
-
-        drainCheck();
     }
 
     @Override
@@ -357,9 +380,10 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
                 final ActiveMQMessage jms = temp;
                 if (jms == null) {
-                    // It's the end of browse signal.
-                    endOfBrowse = true;
-                    drainCheck();
+                    LOG.info("End of browse signals endpoint drained.");
+                    // It's the end of browse signal in response to a MessagePull
+                    getEndpoint().drained();
+                    draining = false;
                 } else {
                     jms.setRedeliveryCounter(md.getRedeliveryCounter());
                     jms.setReadOnlyBody(true);
@@ -436,16 +460,4 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
             });
         }
     }
-
-    private void drainCheck() {
-        // If we are a browser.. lets not say we are drained until
-        // we hit the end of browse message.
-        if (consumerInfo.isBrowser() && !endOfBrowse) {
-            return;
-        }
-
-        if (outbound.isEmpty()) {
-            getEndpoint().drained();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java
index af07e20..11b5c71 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/IDERunner.java
@@ -16,22 +16,31 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.File;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.store.kahadb.KahaDBStore;
-// import org.apache.activemq.leveldb.LevelDBStore;
-
-import java.io.File;
 
 public class IDERunner {
 
+    private static final String AMQP_TRANSFORMER = "jms";
+    private static final boolean TRANSPORT_TRACE = true;
+
     public static void main(String[]args) throws Exception {
-        BrokerService bs = new BrokerService();
-        bs.addConnector("amqp://0.0.0.0:5672?trace=true");
+        BrokerService brokerService = new BrokerService();
+
+        brokerService.addConnector(
+            "amqp://0.0.0.0:5672?trace=" + TRANSPORT_TRACE + "&transport.transformer=" + AMQP_TRANSFORMER);
+
         KahaDBStore store = new KahaDBStore();
         store.setDirectory(new File("target/activemq-data/kahadb"));
-        bs.setPersistenceAdapter(store);
-        bs.deleteAllMessages();
-        bs.start();
-        bs.waitUntilStopped();
+
+        brokerService.setStoreOpenWireVersion(10);
+        brokerService.setPersistenceAdapter(store);
+        brokerService.setUseJmx(false);
+        brokerService.deleteAllMessages();
+
+        brokerService.start();
+        brokerService.waitUntilStopped();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 13b5904..b1ca527 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -43,7 +43,6 @@ import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.engine.Receiver;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -340,7 +339,7 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    @Ignore("Test fails currently due to improper implementation of drain.")
+    //@Ignore("Test fails currently due to improper implementation of drain.")
     @Test(timeout = 60000)
     public void testReceiverCanDrainMessages() throws Exception {
         int MSG_COUNT = 20;

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index b101d72..bc815e5 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -88,13 +88,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
      * Allows a message to be pulled on demand by a client
      */
     @Override
-    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
         // The slave should not deliver pull messages.
         // TODO: when the slave becomes a master, He should send a NULL message to all the
         // consumers to 'wake them up' in case they were waiting for a message.
         if (getPrefetchSize() == 0) {
-
-            prefetchExtension.incrementAndGet();
+            prefetchExtension.set(pull.getQuantity());
             final long dispatchCounterBeforePull = dispatchCounter;
 
             // Have the destination push us some messages.
@@ -105,10 +104,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
             synchronized(this) {
                 // If there was nothing dispatched.. we may need to setup a timeout.
-                if (dispatchCounterBeforePull == dispatchCounter) {
+                if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) {
                     // immediate timeout used by receiveNoWait()
                     if (pull.getTimeout() == -1) {
-                        // Send a NULL message.
+                        // Null message indicates the pull is done or did not have pending.
+                        prefetchExtension.set(1);
                         add(QueueMessageReference.NULL_MESSAGE);
                         dispatchPending();
                     }
@@ -116,7 +116,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         scheduler.executeAfterDelay(new Runnable() {
                             @Override
                             public void run() {
-                                pullTimeout(dispatchCounterBeforePull);
+                                pullTimeout(dispatchCounterBeforePull, pull.isAlwaysSignalDone());
                             }
                         }, pull.getTimeout());
                     }
@@ -130,14 +130,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
      * Occurs when a pull times out. If nothing has been dispatched since the
      * timeout was setup, then send the NULL message.
      */
-    final void pullTimeout(long dispatchCounterBeforePull) {
+    final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
         synchronized (pendingLock) {
-            if (dispatchCounterBeforePull == dispatchCounter) {
+            if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) {
                 try {
+                    prefetchExtension.set(1);
                     add(QueueMessageReference.NULL_MESSAGE);
                     dispatchPending();
                 } catch (Exception e) {
                     context.getConnection().serviceException(e);
+                } finally {
+                    prefetchExtension.set(0);
                 }
             }
         }
@@ -147,7 +150,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
     public void add(MessageReference node) throws Exception {
         synchronized (pendingLock) {
             // The destination may have just been removed...
-            if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
+            if (!destinations.contains(node.getRegionDestination()) && node != QueueMessageReference.NULL_MESSAGE) {
                 // perhaps we should inform the caller that we are no longer valid to dispatch to?
                 return;
             }
@@ -213,7 +216,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
                 // Acknowledge all dispatched messages up till the message id of
                 // the acknowledgment.
-                int index = 0;
                 boolean inAckRange = false;
                 List<MessageReference> removeList = new ArrayList<MessageReference>();
                 for (final MessageReference node : dispatched) {
@@ -231,7 +233,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         } else {
                             registerRemoveSync(context, node);
                         }
-                        index++;
                         acknowledge(context, ack, node);
                         if (ack.getLastMessageId().equals(messageId)) {
                             destination = (Destination) node.getRegionDestination();

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 8db7c62..e81be74 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -326,23 +326,23 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     @Override
-    public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
+    public Response pullMessage(ConnectionContext context, final MessagePull pull) throws Exception {
 
         // The slave should not deliver pull messages.
-        if (getPrefetchSize() == 0 ) {
+        if (getPrefetchSize() == 0) {
 
             final long currentDispatchedCount = dispatchedCounter.get();
-            prefetchExtension.incrementAndGet();
+            prefetchExtension.set(pull.getQuantity());
             dispatchMatched();
 
             // If there was nothing dispatched.. we may need to setup a timeout.
-            if (currentDispatchedCount == dispatchedCounter.get()) {
+            if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) {
 
                 // immediate timeout used by receiveNoWait()
                 if (pull.getTimeout() == -1) {
-                    prefetchExtension.decrementAndGet();
                     // Send a NULL message to signal nothing pending.
                     dispatch(null);
+                    prefetchExtension.set(0);
                 }
 
                 if (pull.getTimeout() > 0) {
@@ -350,7 +350,7 @@ public class TopicSubscription extends AbstractSubscription {
 
                         @Override
                         public void run() {
-                            pullTimeout(currentDispatchedCount);
+                            pullTimeout(currentDispatchedCount, pull.isAlwaysSignalDone());
                         }
                     }, pull.getTimeout());
                 }
@@ -363,15 +363,15 @@ public class TopicSubscription extends AbstractSubscription {
      * Occurs when a pull times out. If nothing has been dispatched since the
      * timeout was setup, then send the NULL message.
      */
-    private final void pullTimeout(long currentDispatchedCount) {
+    private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
         synchronized (matchedListMutex) {
-            if (currentDispatchedCount == dispatchedCounter.get()) {
+            if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) {
                 try {
                     dispatch(null);
                 } catch (Exception e) {
                     context.getConnection().serviceException(e);
                 } finally {
-                    prefetchExtension.decrementAndGet();
+                    prefetchExtension.set(0);
                 }
             }
         }
@@ -583,7 +583,7 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     private void dispatch(final MessageReference node) throws IOException {
-        Message message = node.getMessage();
+        Message message = node != null ? node.getMessage() : null;
         if (node != null) {
             node.incrementReferenceCount();
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/05ff52dc/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
index e39aeae..57a25f9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessagePull.java
@@ -35,6 +35,8 @@ public class MessagePull extends BaseCommand {
     private MessageId messageId;
     private String correlationId;
 
+    private transient int quantity = 1;
+    private transient boolean alwaysSignalDone;
     private transient boolean tracked = false;
 
     @Override
@@ -124,4 +126,20 @@ public class MessagePull extends BaseCommand {
     public boolean isTracked() {
         return this.tracked;
     }
+
+    public int getQuantity() {
+        return quantity;
+    }
+
+    public void setQuantity(int quantity) {
+        this.quantity = quantity;
+    }
+
+    public boolean isAlwaysSignalDone() {
+        return alwaysSignalDone;
+    }
+
+    public void setAlwaysSignalDone(boolean alwaysSignalDone) {
+        this.alwaysSignalDone = alwaysSignalDone;
+    }
 }