You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/05/05 20:35:58 UTC
activemq git commit: AMQ-5763: ensure we replenish the credit for the
transaction coordinator link
Repository: activemq
Updated Branches:
refs/heads/master 32f21ff1a -> fa81c1ff7
AMQ-5763: ensure we replenish the credit for the transaction coordinator link
https://issues.apache.org/jira/browse/AMQ-5763
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fa81c1ff
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fa81c1ff
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fa81c1ff
Branch: refs/heads/master
Commit: fa81c1ff732fd50312ba349c8226c2d84345ad1d
Parents: 32f21ff
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue May 5 19:21:32 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue May 5 19:35:20 2015 +0100
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpAbstractReceiver.java | 13 +++++++++++++
.../activemq/transport/amqp/protocol/AmqpReceiver.java | 13 -------------
.../amqp/protocol/AmqpTransactionCoordinator.java | 10 ++++++++++
.../transport/amqp/JMSClientTransactionTest.java | 2 --
4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index 7436a78..e119f61 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -32,6 +32,7 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
protected ByteArrayOutputStream current = new ByteArrayOutputStream();
protected final byte[] recvBuffer = new byte[1024 * 8];
+ protected final int configuredCredit;
/**
* Handle create of new AMQP Receiver instance.
@@ -43,6 +44,7 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
*/
public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
super(session, endpoint);
+ this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
}
@Override
@@ -54,6 +56,17 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
}
/**
+ * Returns the amount of receiver credit that has been configured for this AMQP
+ * transport. If no value was configured on the TransportConnector URI then a
+ * sensible default is used.
+ *
+ * @return the configured receiver credit to grant.
+ */
+ public int getConfiguredReceiverCredit() {
+ return configuredCredit;
+ }
+
+ /**
* Provide the receiver endpoint with the given amount of credits.
*
* @param credits
http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 5630d6c..5d8b502 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -64,7 +64,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
private final ProducerInfo producerInfo;
- private final int configuredCredit;
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private InboundTransformer inboundTransformer;
@@ -83,7 +82,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
super(session, endpoint);
this.producerInfo = producerInfo;
- this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
}
@Override
@@ -125,17 +123,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
return producerInfo.getDestination() == null;
}
- /**
- * Returns the amount of receiver credit that has been configured for this AMQP
- * transport. If no value was configured on the TransportConnector URI then a
- * sensible default is used.
- *
- * @return the configured receiver credit to grant.
- */
- public int getConfiguredReceiverCredit() {
- return configuredCredit;
- }
-
//----- Internal Implementation ------------------------------------------//
protected InboundTransformer getInboundTransformer() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
index 14fa3ad..576ce20 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -145,6 +145,16 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
} else {
throw new Exception("Expected coordinator message type: " + action.getClass());
}
+
+ replenishCredit();
+ }
+
+ private void replenishCredit() {
+ if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+ LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId());
+ getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+ session.pumpProtonToSocket();
+ }
}
private long getNextTransactionId() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 587ac3e..47dc9ec 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -26,7 +26,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +39,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
private final int MSG_COUNT = 1000;
- @Ignore("Fails currently")
@Test(timeout = 60000)
public void testSingleConsumedMessagePerTxCase() throws Exception {
connection = createConnection();