You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/13 23:20:32 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5661

Repository: activemq
Updated Branches:
  refs/heads/master 72839b78a -> 6a6ef45ee


https://issues.apache.org/jira/browse/AMQ-5661

Always honor the link credit as true prefetch value for the
subscription.  Enables previously failing test to verify. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6a6ef45e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6a6ef45e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6a6ef45e

Branch: refs/heads/master
Commit: 6a6ef45ee04d332ed3905f79ea87527fa6264d94
Parents: 72839b7
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 13 18:20:26 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 13 18:20:26 2015 -0400

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  7 ---
 .../transport/amqp/AmqpProtocolConverter.java   | 46 +++++---------------
 .../transport/amqp/AmqpTransportFilter.java     |  5 ++-
 .../transport/amqp/IAmqpProtocolConverter.java  |  2 -
 .../amqp/interop/AmqpReceiverTest.java          |  1 -
 5 files changed, 16 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index b484500..f5b457b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -33,7 +33,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
     private final AmqpTransport transport;
     private final BrokerService brokerService;
 
-    private int prefetch = 0;
     private int producerCredit = DEFAULT_PREFETCH;
 
     interface Discriminator {
@@ -90,7 +89,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
             }
 
             IAmqpProtocolConverter next = match.create(transport, brokerService);
-            next.setPrefetch(prefetch);
             next.setProducerCredit(producerCredit);
             transport.setProtocolConverter(next);
             for (Command send : pendingCommands) {
@@ -117,11 +115,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
     }
 
     @Override
-    public void setPrefetch(int prefetch) {
-        this.prefetch = prefetch;
-    }
-
-    @Override
     public void setProducerCredit(int producerCredit) {
         this.producerCredit = producerCredit;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 39c8c2b..3661f3d 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -155,7 +155,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private final BrokerService brokerService;
     private AuthenticationBroker authenticator;
 
-    protected int prefetch;
     protected int producerCredit;
     protected Transport protonTransport = Proton.transport();
     protected Connection protonConnection = Proton.connection();
@@ -410,17 +409,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         int credit = link.getCredit();
         if (context instanceof ConsumerContext) {
             ConsumerContext consumerContext = (ConsumerContext)context;
-            // change consumer prefetch if it's not been already set using
-            // transport connector property or consumer preference
-            if (consumerContext.consumerPrefetch == 0 && credit > 0) {
+
+            if (credit != consumerContext.credit) {
+                consumerContext.credit = credit >= 0 ? credit : 0;
                 ConsumerControl control = new ConsumerControl();
                 control.setConsumerId(consumerContext.consumerId);
                 control.setDestination(consumerContext.destination);
-                control.setPrefetch(credit);
-                consumerContext.consumerPrefetch = credit;
+                control.setPrefetch(consumerContext.credit);
                 sendToActiveMQ(control, null);
             }
-            consumerContext.credit = credit;
         }
         ((AmqpDeliveryListener) link.getContext()).drainCheck();
     }
@@ -1061,7 +1058,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         public ConsumerInfo info;
         private boolean endOfBrowse = false;
         public int credit;
-        public int consumerPrefetch = 0;
         private long lastDeliveredSequenceId;
 
         protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
@@ -1481,33 +1477,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 destination = createDestination(source);
             }
 
+            int senderCredit = sender.getRemoteCredit();
+
             subscriptionsByConsumerId.put(id, consumerContext);
             ConsumerInfo consumerInfo = new ConsumerInfo(id);
-            consumerContext.info = consumerInfo;
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(destination);
-            consumerContext.setDestination(destination);
-            int senderCredit = sender.getRemoteCredit();
-            if (prefetch != 0) {
-                // use the value configured on the transport connector
-                // this value will not be changed to the consumer's preference
-                consumerInfo.setPrefetchSize(prefetch);
-                consumerContext.consumerPrefetch = prefetch;
-            } else {
-                if (senderCredit != 0) {
-                    // set the prefetch to the value of the remote credit
-                    // and ignore the later changes
-                    consumerInfo.setPrefetchSize(senderCredit);
-                    consumerContext.consumerPrefetch = senderCredit;
-                } else {
-                    // set zero value for now and change to the consumer's preference
-                    // on the first flow packet
-                    consumerInfo.setPrefetchSize(0);
-                }
-            }
-            consumerContext.credit = senderCredit;
+            consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
             consumerInfo.setDispatchAsync(true);
+
             if (source.getDistributionMode() == COPY && destination.isQueue()) {
                 consumerInfo.setBrowser(true);
             }
@@ -1521,6 +1500,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 consumerInfo.setNoLocal(true);
             }
 
+            consumerContext.info = consumerInfo;
+            consumerContext.setDestination(destination);
+            consumerContext.credit = senderCredit;
+
             sendToActiveMQ(consumerInfo, new ResponseHandler() {
                 @Override
                 public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
@@ -1657,11 +1640,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     }
 
     @Override
-    public void setPrefetch(int prefetch) {
-        this.prefetch = prefetch;
-    }
-
-    @Override
     public void setProducerCredit(int producerCredit) {
         this.producerCredit = producerCredit;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index fb7542b..5dfdf75 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -187,8 +187,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
         this.protocolConverter = protocolConverter;
     }
 
+    /**
+     * @deprecated AMQP receiver configures it's prefetch via flow, remove on next release.
+     */
+    @Deprecated
     public void setPrefetch(int prefetch) {
-        protocolConverter.setPrefetch(prefetch);
     }
 
     public void setProducerCredit(int producerCredit) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
index 3e365ae..8296ef2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
@@ -32,7 +32,5 @@ public interface IAmqpProtocolConverter {
 
     void updateTracer();
 
-    void setPrefetch(int prefetch);
-
     void setProducerCredit(int producerCredit);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 1245811..1bc3d66 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -141,7 +141,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
         connection.close();
     }
 
-    @Ignore("Fails due to issues with accept and no credit")
     @Test(timeout = 60000)
     public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
         int MSG_COUNT = 4;