You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/13 23:20:32 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5661
Repository: activemq
Updated Branches:
refs/heads/master 72839b78a -> 6a6ef45ee
https://issues.apache.org/jira/browse/AMQ-5661
Always honor the link credit as true prefetch value for the
subscription. Enables previously failing test to verify.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6a6ef45e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6a6ef45e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6a6ef45e
Branch: refs/heads/master
Commit: 6a6ef45ee04d332ed3905f79ea87527fa6264d94
Parents: 72839b7
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 13 18:20:26 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 13 18:20:26 2015 -0400
----------------------------------------------------------------------
.../amqp/AMQPProtocolDiscriminator.java | 7 ---
.../transport/amqp/AmqpProtocolConverter.java | 46 +++++---------------
.../transport/amqp/AmqpTransportFilter.java | 5 ++-
.../transport/amqp/IAmqpProtocolConverter.java | 2 -
.../amqp/interop/AmqpReceiverTest.java | 1 -
5 files changed, 16 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
index b484500..f5b457b 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java
@@ -33,7 +33,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
private final AmqpTransport transport;
private final BrokerService brokerService;
- private int prefetch = 0;
private int producerCredit = DEFAULT_PREFETCH;
interface Discriminator {
@@ -90,7 +89,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
}
IAmqpProtocolConverter next = match.create(transport, brokerService);
- next.setPrefetch(prefetch);
next.setProducerCredit(producerCredit);
transport.setProtocolConverter(next);
for (Command send : pendingCommands) {
@@ -117,11 +115,6 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter {
}
@Override
- public void setPrefetch(int prefetch) {
- this.prefetch = prefetch;
- }
-
- @Override
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 39c8c2b..3661f3d 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -155,7 +155,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final BrokerService brokerService;
private AuthenticationBroker authenticator;
- protected int prefetch;
protected int producerCredit;
protected Transport protonTransport = Proton.transport();
protected Connection protonConnection = Proton.connection();
@@ -410,17 +409,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
int credit = link.getCredit();
if (context instanceof ConsumerContext) {
ConsumerContext consumerContext = (ConsumerContext)context;
- // change consumer prefetch if it's not been already set using
- // transport connector property or consumer preference
- if (consumerContext.consumerPrefetch == 0 && credit > 0) {
+
+ if (credit != consumerContext.credit) {
+ consumerContext.credit = credit >= 0 ? credit : 0;
ConsumerControl control = new ConsumerControl();
control.setConsumerId(consumerContext.consumerId);
control.setDestination(consumerContext.destination);
- control.setPrefetch(credit);
- consumerContext.consumerPrefetch = credit;
+ control.setPrefetch(consumerContext.credit);
sendToActiveMQ(control, null);
}
- consumerContext.credit = credit;
}
((AmqpDeliveryListener) link.getContext()).drainCheck();
}
@@ -1061,7 +1058,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public ConsumerInfo info;
private boolean endOfBrowse = false;
public int credit;
- public int consumerPrefetch = 0;
private long lastDeliveredSequenceId;
protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
@@ -1481,33 +1477,16 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
destination = createDestination(source);
}
+ int senderCredit = sender.getRemoteCredit();
+
subscriptionsByConsumerId.put(id, consumerContext);
ConsumerInfo consumerInfo = new ConsumerInfo(id);
- consumerContext.info = consumerInfo;
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
consumerInfo.setDestination(destination);
- consumerContext.setDestination(destination);
- int senderCredit = sender.getRemoteCredit();
- if (prefetch != 0) {
- // use the value configured on the transport connector
- // this value will not be changed to the consumer's preference
- consumerInfo.setPrefetchSize(prefetch);
- consumerContext.consumerPrefetch = prefetch;
- } else {
- if (senderCredit != 0) {
- // set the prefetch to the value of the remote credit
- // and ignore the later changes
- consumerInfo.setPrefetchSize(senderCredit);
- consumerContext.consumerPrefetch = senderCredit;
- } else {
- // set zero value for now and change to the consumer's preference
- // on the first flow packet
- consumerInfo.setPrefetchSize(0);
- }
- }
- consumerContext.credit = senderCredit;
+ consumerInfo.setPrefetchSize(senderCredit >= 0 ? senderCredit : 0);
consumerInfo.setDispatchAsync(true);
+
if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
}
@@ -1521,6 +1500,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setNoLocal(true);
}
+ consumerContext.info = consumerInfo;
+ consumerContext.setDestination(destination);
+ consumerContext.credit = senderCredit;
+
sendToActiveMQ(consumerInfo, new ResponseHandler() {
@Override
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
@@ -1657,11 +1640,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
@Override
- public void setPrefetch(int prefetch) {
- this.prefetch = prefetch;
- }
-
- @Override
public void setProducerCredit(int producerCredit) {
this.producerCredit = producerCredit;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
index fb7542b..5dfdf75 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
@@ -187,8 +187,11 @@ public class AmqpTransportFilter extends TransportFilter implements AmqpTranspor
this.protocolConverter = protocolConverter;
}
+ /**
+ * @deprecated AMQP receiver configures it's prefetch via flow, remove on next release.
+ */
+ @Deprecated
public void setPrefetch(int prefetch) {
- protocolConverter.setPrefetch(prefetch);
}
public void setProducerCredit(int producerCredit) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
index 3e365ae..8296ef2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/IAmqpProtocolConverter.java
@@ -32,7 +32,5 @@ public interface IAmqpProtocolConverter {
void updateTracer();
- void setPrefetch(int prefetch);
-
void setProducerCredit(int producerCredit);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/6a6ef45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
index 1245811..1bc3d66 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpReceiverTest.java
@@ -141,7 +141,6 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
connection.close();
}
- @Ignore("Fails due to issues with accept and no credit")
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4;