You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/21 14:53:00 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch; split producer and consumer prefetch settings; allow consumer prefetch to be adjusted using link credit

Repository: activemq
Updated Branches:
  refs/heads/trunk 3873ecfe5 -> ab3de0c4c


https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch; split producer and consumer prefetch settings; allow consumer prefetch to be adjusted using link credit


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ab3de0c4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ab3de0c4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ab3de0c4

Branch: refs/heads/trunk
Commit: ab3de0c4c2b8af83090558ebbea4ef91ce04024b
Parents: 3873ecf
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Tue Oct 21 14:52:26 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Tue Oct 21 14:52:39 2014 +0200

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  7 ++
 .../transport/amqp/AmqpProtocolConverter.java   | 69 ++++++++++----------
 .../transport/amqp/AmqpTransportFilter.java     |  4 ++
 .../transport/amqp/IAmqpProtocolConverter.java  |  2 +
 4 files changed, 48 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ab3de0c4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index 5eefbb2..09478d7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -31,6 +31,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
     final private AmqpTransport transport;
     private int prefetch = DEFAULT_PREFETCH;
+    private int producerCredit = DEFAULT_PREFETCH;
 
     interface Discriminator {
         boolean matches(AmqpHeader header);
@@ -85,6 +86,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
             }
             IAmqpProtocolConverter next = match.create(transport);
             next.setPrefetch(prefetch);
+            next.setProducerCredit(producerCredit);
             transport.setProtocolConverter(next);
             for (Command send : pendingCommands) {
                 next.onActiveMQCommand(send);
@@ -113,4 +115,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
     public void setPrefetch(int prefetch) {
         this.prefetch = prefetch;
     }
+
+    @Override
+    public void setProducerCredit(int producerCredit) {
+        this.producerCredit = producerCredit;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ab3de0c4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 80b47cc..d9bfaa3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -30,31 +30,7 @@ import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidSelectorException;
 
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
@@ -122,6 +98,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
     protected int prefetch;
+    protected int producerCredit;
     protected Transport protonTransport = Proton.transport();
     protected Connection protonConnection = Proton.connection();
     protected Collector eventCollector = new CollectorImpl();
@@ -296,8 +273,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             processLinkEvent(event.getLink());
                             break;
                         case LINK_FLOW:
-                            Link link = event.getLink();
-                            ((AmqpDeliveryListener) link.getContext()).drainCheck();
+                            processLinkFlow(event.getLink());
                             break;
                         case DELIVERY:
                             processDelivery(event.getDelivery());
@@ -317,6 +293,25 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
+    protected void processLinkFlow(Link link) throws Exception {
+        Object context = link.getContext();
+        int credit = link.getRemoteCredit();
+        if (context != null && context instanceof ConsumerContext) {
+            ConsumerContext consumerContext = (ConsumerContext)context;
+            // change ActiveMQ consumer prefetch if needed
+            if (consumerContext.credit == 0 && consumerContext.consumerPrefetch != credit && credit > 0) {
+                ConsumerControl control = new ConsumerControl();
+                control.setConsumerId(consumerContext.consumerId);
+                control.setDestination(consumerContext.destination);
+                control.setPrefetch(credit);
+                consumerContext.consumerPrefetch = credit;
+                sendToActiveMQ(control, null);
+            }
+            consumerContext.credit = credit;
+        }
+        ((AmqpDeliveryListener) link.getContext()).drainCheck();
+    }
+
     protected void processConnectionEvent(Connection connection) throws Exception {
         EndpointState remoteState = connection.getRemoteState();
         if (remoteState == EndpointState.ACTIVE) {
@@ -828,7 +823,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
-        int flow = prefetch;
+        int flow = producerCredit;
         // use client's preference if set
         if (receiver.getRemoteCredit() != 0) {
             flow = receiver.getRemoteCredit();
@@ -923,6 +918,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private boolean closed;
         public ConsumerInfo info;
         private boolean endOfBrowse = false;
+        public ActiveMQDestination destination;
+        public int credit;
+        public int consumerPrefetch;
 
         protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
 
@@ -1317,12 +1315,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(dest);
-            // use client's preference if set
-            if (sender.getRemoteCredit() != 0) {
-                consumerInfo.setPrefetchSize(sender.getRemoteCredit());
-            } else {
-                consumerInfo.setPrefetchSize(prefetch);
-            }
+            consumerContext.destination = dest;
+            consumerInfo.setPrefetchSize(sender.getRemoteCredit());
+            consumerContext.credit = sender.getRemoteCredit();
+            consumerContext.consumerPrefetch = consumerInfo.getPrefetchSize();
             consumerInfo.setDispatchAsync(true);
             if (source.getDistributionMode() == COPY && dest.isQueue()) {
                 consumerInfo.setBrowser(true);
@@ -1441,4 +1437,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     public void setPrefetch(int prefetch) {
         this.prefetch = prefetch;
     }
+
+    @Override
+    public void setProducerCredit(int producerCredit) {
+        this.producerCredit = producerCredit;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ab3de0c4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 41256c6..ec63ae7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -180,4 +180,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setPrefetch(int prefetch) {
         protocolConverter.setPrefetch(prefetch);
     }
+
+    public void setProducerCredit(int producerCredit) {
+        protocolConverter.setProducerCredit(producerCredit);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ab3de0c4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
index e3621bf..3e365ae 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
@@ -33,4 +33,6 @@ public interface IAmqpProtocolConverter {
     void updateTracer();
 
     void setPrefetch(int prefetch);
+
+    void setProducerCredit(int producerCredit);
 }