You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/10/02 11:34:20 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size

Repository: activemq
Updated Branches:
  refs/heads/trunk fc3d90e8b -> 838bbebee


https://issues.apache.org/jira/browse/AMQ-5379 - amqp prefetch size


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/838bbebe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/838bbebe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/838bbebe

Branch: refs/heads/trunk
Commit: 838bbebeeba7e217cafa0f081d125efdc9faf0ad
Parents: fc3d90e
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Oct 2 11:32:31 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Oct 2 11:32:31 2014 +0200

----------------------------------------------------------------------
 .../amqp/AMQPProtocolDiscriminator.java         |  9 ++++++++
 .../transport/amqp/AmqpProtocolConverter.java   | 22 ++++++++++++++++----
 .../transport/amqp/AmqpTransportFilter.java     |  4 ++++
 .../transport/amqp/IAmqpProtocolConverter.java  |  2 ++
 4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/838bbebe/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index 58da746..5eefbb2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -27,7 +27,10 @@ import org.apache.activemq.command.Command;
  */
 public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
 
+    private static final int DEFAULT_PREFETCH = 100;
+
     final private AmqpTransport transport;
+    private int prefetch = DEFAULT_PREFETCH;
 
     interface Discriminator {
         boolean matches(AmqpHeader header);
@@ -81,6 +84,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
                 match = DISCRIMINATORS.get(0);
             }
             IAmqpProtocolConverter next = match.create(transport);
+            next.setPrefetch(prefetch);
             transport.setProtocolConverter(next);
             for (Command send : pendingCommands) {
                 next.onActiveMQCommand(send);
@@ -104,4 +108,9 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
     @Override
     public void updateTracer() {
     }
+
+    @Override
+    public void setPrefetch(int prefetch) {
+        this.prefetch = prefetch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/838bbebe/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 4951d13..fb275b8 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -117,7 +117,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
-    protected int prefetch = 100;
+    protected int prefetch;
     protected Transport protonTransport = Proton.transport();
     protected Connection protonConnection = Proton.connection();
     protected Collector eventCollector = new CollectorImpl();
@@ -780,11 +780,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
         // Client is producing to this receiver object
         org.apache.qpid.proton.amqp.transport.Target remoteTarget = receiver.getRemoteTarget();
+        int flow = prefetch;
+        // use client's preference if set
+        if (receiver.getRemoteCredit() != 0) {
+            flow = receiver.getRemoteCredit();
+        }
         try {
             if (remoteTarget instanceof Coordinator) {
                 pumpProtonToSocket();
                 receiver.setContext(coordinatorContext);
-                receiver.flow(prefetch);
+                receiver.flow(flow);
                 receiver.open();
                 pumpProtonToSocket();
             } else {
@@ -804,7 +809,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 ProducerContext producerContext = new ProducerContext(producerId, dest);
 
                 receiver.setContext(producerContext);
-                receiver.flow(prefetch);
+                receiver.flow(flow);
                 ProducerInfo producerInfo = new ProducerInfo(producerId);
                 producerInfo.setDestination(dest);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
@@ -1258,7 +1263,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
             consumerInfo.setDestination(dest);
-            consumerInfo.setPrefetchSize(100);
+            // use client's preference if set
+            if (sender.getRemoteCredit() != 0) {
+                consumerInfo.setPrefetchSize(sender.getRemoteCredit());
+            } else {
+                consumerInfo.setPrefetchSize(prefetch);
+            }
             consumerInfo.setDispatchAsync(true);
             if (source.getDistributionMode() == COPY && dest.isQueue()) {
                 consumerInfo.setBrowser(true);
@@ -1372,4 +1382,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         condition.setDescription(description);
         return condition;
     }
+
+    public void setPrefetch(int prefetch) {
+        this.prefetch = prefetch;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/838bbebe/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index 0f0badb..41256c6 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -176,4 +176,8 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
     public void setProtocolConverter(IAmqpProtocolConverter protocolConverter) {
         this.protocolConverter = protocolConverter;
     }
+
+    public void setPrefetch(int prefetch) {
+        protocolConverter.setPrefetch(prefetch);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/838bbebe/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
index d4eb648..e3621bf 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
@@ -31,4 +31,6 @@ public interface IAmqpProtocolConverter {
     void onActiveMQCommand(Command command) throws Exception;
 
     void updateTracer();
+
+    void setPrefetch(int prefetch);
 }