You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/05/05 20:35:58 UTC

activemq git commit: AMQ-5763: ensure we replenish the credit for the transaction coordinator link

Repository: activemq
Updated Branches:
  refs/heads/master 32f21ff1a -> fa81c1ff7


AMQ-5763: ensure we replenish the credit for the transaction coordinator link

https://issues.apache.org/jira/browse/AMQ-5763


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/fa81c1ff
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/fa81c1ff
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/fa81c1ff

Branch: refs/heads/master
Commit: fa81c1ff732fd50312ba349c8226c2d84345ad1d
Parents: 32f21ff
Author: Robert Gemmell <ro...@apache.org>
Authored: Tue May 5 19:21:32 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Tue May 5 19:35:20 2015 +0100

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpAbstractReceiver.java  | 13 +++++++++++++
 .../activemq/transport/amqp/protocol/AmqpReceiver.java | 13 -------------
 .../amqp/protocol/AmqpTransactionCoordinator.java      | 10 ++++++++++
 .../transport/amqp/JMSClientTransactionTest.java       |  2 --
 4 files changed, 23 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
index 7436a78..e119f61 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpAbstractReceiver.java
@@ -32,6 +32,7 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
 
     protected ByteArrayOutputStream current = new ByteArrayOutputStream();
     protected final byte[] recvBuffer = new byte[1024 * 8];
+    protected final int configuredCredit;
 
     /**
      * Handle create of new AMQP Receiver instance.
@@ -43,6 +44,7 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
      */
     public AmqpAbstractReceiver(AmqpSession session, Receiver endpoint) {
         super(session, endpoint);
+        this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
     }
 
     @Override
@@ -54,6 +56,17 @@ public abstract class AmqpAbstractReceiver extends AmqpAbstractLink<Receiver> {
     }
 
     /**
+     * Returns the amount of receiver credit that has been configured for this AMQP
+     * transport.  If no value was configured on the TransportConnector URI then a
+     * sensible default is used.
+     *
+     * @return the configured receiver credit to grant.
+     */
+    public int getConfiguredReceiverCredit() {
+        return configuredCredit;
+    }
+
+    /**
      * Provide the receiver endpoint with the given amount of credits.
      *
      * @param credits

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 5630d6c..5d8b502 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -64,7 +64,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
     private static final Logger LOG = LoggerFactory.getLogger(AmqpReceiver.class);
 
     private final ProducerInfo producerInfo;
-    private final int configuredCredit;
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
 
     private InboundTransformer inboundTransformer;
@@ -83,7 +82,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
         super(session, endpoint);
 
         this.producerInfo = producerInfo;
-        this.configuredCredit = session.getConnection().getConfiguredReceiverCredit();
     }
 
     @Override
@@ -125,17 +123,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
         return producerInfo.getDestination() == null;
     }
 
-    /**
-     * Returns the amount of receiver credit that has been configured for this AMQP
-     * transport.  If no value was configured on the TransportConnector URI then a
-     * sensible default is used.
-     *
-     * @return the configured receiver credit to grant.
-     */
-    public int getConfiguredReceiverCredit() {
-        return configuredCredit;
-    }
-
     //----- Internal Implementation ------------------------------------------//
 
     protected InboundTransformer getInboundTransformer() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
index 14fa3ad..576ce20 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpTransactionCoordinator.java
@@ -145,6 +145,16 @@ public class AmqpTransactionCoordinator extends AmqpAbstractReceiver {
         } else {
             throw new Exception("Expected coordinator message type: " + action.getClass());
         }
+
+        replenishCredit();
+    }
+
+    private void replenishCredit() {
+        if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
+            LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), session.getSessionId());
+            getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
+            session.pumpProtonToSocket();
+        }
     }
 
     private long getNextTransactionId() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/fa81c1ff/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
index 587ac3e..47dc9ec 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTransactionTest.java
@@ -26,7 +26,6 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,7 +39,6 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
 
     private final int MSG_COUNT = 1000;
 
-    @Ignore("Fails currently")
     @Test(timeout = 60000)
     public void testSingleConsumedMessagePerTxCase() throws Exception {
         connection = createConnection();