You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/14 21:19:00 UTC

[1/2] activemq-artemis git commit: This closes #1095

Repository: activemq-artemis
Updated Branches:
  refs/heads/master b674c6b84 -> 9e6c40a8d


This closes #1095


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9e6c40a8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9e6c40a8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9e6c40a8

Branch: refs/heads/master
Commit: 9e6c40a8dec22ac50c1d29b8222a68a75f9a4292
Parents: b674c6b 7282b68
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 14 17:13:05 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 14 17:13:05 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/proton/AMQPSessionContext.java    |  2 +-
 .../transaction/ProtonTransactionHandler.java       | 13 +++++++++++--
 .../tests/integration/amqp/AmqpTransactionTest.java | 16 ++++++++++++++++
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1039 Transaction Coordinator credit refill

Posted by cl...@apache.org.
ARTEMIS-1039 Transaction Coordinator credit refill

The coordinator needs to refill credit on the receiver once it has been
exhausted, otherwise the remote cannot send additional declare or
discharge commands to the broker.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7282b689
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7282b689
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7282b689

Branch: refs/heads/master
Commit: 7282b6890aaafce7209daa89311715586f9b4564
Parents: b674c6b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 14 16:07:58 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 14 17:13:05 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/proton/AMQPSessionContext.java    |  2 +-
 .../transaction/ProtonTransactionHandler.java       | 13 +++++++++++--
 .../tests/integration/amqp/AmqpTransactionTest.java | 16 ++++++++++++++++
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7282b689/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index 89b6ed3..d1fc0e1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -148,7 +148,7 @@ public class AMQPSessionContext extends ProtonInitializable {
 
       receiver.setContext(transactionHandler);
       receiver.open();
-      receiver.flow(100);
+      receiver.flow(ProtonTransactionHandler.DEFAULT_COORDINATOR_CREDIT);
    }
 
    public void addSender(Sender sender) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7282b689/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 2cdb072..721bd33 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
@@ -38,6 +36,9 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+
 /**
  * handles an amqp Coordinator to deal with transaction boundaries etc
  */
@@ -45,6 +46,8 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
 
    private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
 
+   public static final int DEFAULT_COORDINATOR_CREDIT = 100;
+
    final AMQPSessionCallback sessionSPI;
 
    public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
@@ -98,6 +101,12 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                }
             }
+
+            // Replenish coordinator receiver credit on exhaustion so sender can continue
+            // transaction declare and discahrge operations.
+            if (receiver.getCredit() == 0) {
+               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+            }
          }
       } catch (ActiveMQAMQPException amqpE) {
          delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7282b689/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 1b2a1b0..c00cc1c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -49,6 +49,22 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 30000)
+   public void testCoordinatorReplenishesCredit() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+      assertNotNull(session);
+
+      for (int i = 0; i < 1000; ++i) {
+         session.begin();
+         assertTrue(session.isInTransaction());
+         session.commit();
+      }
+
+      connection.close();
+   }
+
+   @Test(timeout = 30000)
    public void testBeginAndRollbackTransaction() throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());