You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/17 21:28:58 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1046 Fixing TX eventually stalling with AMQP

ARTEMIS-1046 Fixing TX eventually stalling with AMQP

I have also reviewed the model in which we used transactions


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1ef4dcf7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1ef4dcf7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1ef4dcf7

Branch: refs/heads/master
Commit: 1ef4dcf7d921379618363d035ea8a09794c3637d
Parents: 291a471
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 17 15:59:34 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 17 16:50:56 2017 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPConnectionCallback.java     | 25 ++++--
 .../amqp/broker/AMQPSessionCallback.java        | 30 +++----
 .../amqp/proton/AMQPSessionContext.java         |  2 +-
 .../proton/ProtonServerReceiverContext.java     |  4 +-
 .../amqp/proton/ProtonServerSenderContext.java  |  2 +-
 .../transaction/ProtonTransactionHandler.java   | 78 ++++++++++-------
 .../protocol/amqp/util/DeliveryUtil.java        | 18 +---
 .../integration/amqp/AmqpTransactionTest.java   | 90 +++++++++++++++++++-
 8 files changed, 165 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
index 7e7dc60..4265f28 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java
@@ -62,7 +62,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    private static final Logger logger = Logger.getLogger(AMQPConnectionCallback.class);
 
-   private ConcurrentMap<XidImpl, Transaction> transactions = new ConcurrentHashMap<>();
+   private ConcurrentMap<Binary, Transaction> transactions = new ConcurrentHashMap<>();
 
    private final ProtonProtocolManager manager;
 
@@ -224,25 +224,32 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
 
    public Binary newTransaction() {
       XidImpl xid = newXID();
+      Binary binary = new Binary(xid.getGlobalTransactionId());
       Transaction transaction = new ProtonTransactionImpl(xid, server.getStorageManager(), -1);
-      transactions.put(xid, transaction);
-      return new Binary(xid.getGlobalTransactionId());
+      transactions.put(binary, transaction);
+      return binary;
    }
 
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      XidImpl xid = newXID(txid.getArray());
-      Transaction tx = transactions.get(xid);
+   public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+      Transaction tx;
+
+      if (remove) {
+         tx = transactions.remove(txid);
+      } else {
+         tx = transactions.get(txid);
+      }
 
       if (tx == null) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(xid.toString());
+         logger.warn("Couldn't find txid = " + txid);
+         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.txNotFound(txid.toString());
       }
 
       return tx;
    }
 
-   public void removeTransaction(Binary txid) {
+   public Transaction removeTransaction(Binary txid) {
       XidImpl xid = newXID(txid.getArray());
-      transactions.remove(xid);
+      return transactions.remove(xid);
    }
 
    protected XidImpl newXID() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 7f7e22b..3592dbc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -47,7 +47,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
-import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
 import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
@@ -92,6 +91,10 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AtomicBoolean draining = new AtomicBoolean(false);
 
+   public Object getProtonLock() {
+      return connection.getLock();
+   }
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -382,8 +385,10 @@ public class AMQPSessionCallback implements SessionCallback {
       condition.setDescription(errorMessage);
       Rejected rejected = new Rejected();
       rejected.setError(condition);
-      delivery.disposition(rejected);
-      delivery.settle();
+      synchronized (connection.getLock()) {
+         delivery.disposition(rejected);
+         delivery.settle();
+      }
       connection.flush();
    }
 
@@ -536,29 +541,14 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public Transaction getTransaction(Binary txid) throws ActiveMQAMQPException {
-      return protonSPI.getTransaction(txid);
+   public Transaction getTransaction(Binary txid, boolean remove) throws ActiveMQAMQPException {
+      return protonSPI.getTransaction(txid, remove);
    }
 
    public Binary newTransaction() {
       return protonSPI.newTransaction();
    }
 
-   public void commitTX(Binary txid) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.commit(true);
-      protonSPI.removeTransaction(txid);
-   }
-
-   public void rollbackTX(Binary txid, boolean lastMessageReceived) throws Exception {
-      Transaction tx = protonSPI.getTransaction(txid);
-      tx.rollback();
-      protonSPI.removeTransaction(txid);
-   }
-
-   public void dischargeTx(Binary txid) throws ActiveMQAMQPException {
-      ((ProtonTransactionImpl) protonSPI.getTransaction(txid)).discharge();
-   }
 
    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
       return serverSession.getMatchingQueue(address, routingType);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
index d1fc0e1..ccc4a6c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java
@@ -142,7 +142,7 @@ public class AMQPSessionContext extends ProtonInitializable {
    }
 
    public void addTransactionHandler(Coordinator coordinator, Receiver receiver) {
-      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI);
+      ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI, connection);
 
       coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn"));
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index f08c1fc..54467cf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -155,7 +155,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
          if (delivery.getRemoteState() instanceof TransactionalState) {
 
             TransactionalState txState = (TransactionalState) delivery.getRemoteState();
-            tx = this.sessionSPI.getTransaction(txState.getTxnId());
+            tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
          }
 
          sessionSPI.serverSend(tx, receiver, delivery, address, delivery.getMessageFormat(), data);
@@ -201,8 +201,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
       } else {
          synchronized (connection.getLock()) {
             receiver.flow(credits);
-            connection.flush();
          }
+         connection.flush();
       }
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0e0447f..5a97c02 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -493,7 +493,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          if (remoteState instanceof TransactionalState) {
 
             TransactionalState txState = (TransactionalState) remoteState;
-            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId());
+            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
 
             if (txState.getOutcome() != null) {
                settleImmediate = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 721bd33..12498b0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -18,10 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton.transaction;
 
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
+import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.ProtonDeliveryHandler;
 import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -36,9 +35,6 @@ import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 /**
  * handles an amqp Coordinator to deal with transaction boundaries etc
  */
@@ -47,17 +43,18 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
    private static final Logger log = Logger.getLogger(ProtonTransactionHandler.class);
 
    public static final int DEFAULT_COORDINATOR_CREDIT = 100;
+   public static final int CREDIT_LOW_WATERMARK = 30;
 
    final AMQPSessionCallback sessionSPI;
+   final AMQPConnectionContext connection;
 
-   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI) {
+   public ProtonTransactionHandler(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection) {
       this.sessionSPI = sessionSPI;
+      this.connection = connection;
    }
 
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
       final Receiver receiver;
       try {
          receiver = ((Receiver) delivery.getLink());
@@ -66,9 +63,21 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             return;
          }
 
-         receiver.recv(new NettyWritable(buffer));
+         byte[] buffer;
+
+         synchronized (connection.getLock()) {
+            // Replenish coordinator receiver credit on exhaustion so sender can continue
+            // transaction declare and discahrge operations.
+            if (receiver.getCredit() < CREDIT_LOW_WATERMARK) {
+               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+            }
+
+            buffer = new byte[delivery.available()];
+            receiver.recv(buffer, 0, buffer.length);
+            receiver.advance();
+         }
+
 
-         receiver.advance();
 
          MessageImpl msg = DeliveryUtil.decodeMessageImpl(buffer);
 
@@ -78,44 +87,47 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
             Binary txID = sessionSPI.newTransaction();
             Declared declared = new Declared();
             declared.setTxnId(txID);
-            delivery.disposition(declared);
+            synchronized (connection.getLock()) {
+               delivery.disposition(declared);
+            }
          } else if (action instanceof Discharge) {
             Discharge discharge = (Discharge) action;
 
             Binary txID = discharge.getTxnId();
-            sessionSPI.dischargeTx(txID);
+            ProtonTransactionImpl tx = (ProtonTransactionImpl)sessionSPI.getTransaction(txID, true);
+            tx.discharge();
+
             if (discharge.getFail()) {
-               try {
-                  sessionSPI.rollbackTX(txID, true);
+               tx.rollback();
+               synchronized (connection.getLock()) {
                   delivery.disposition(new Accepted());
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage());
                }
+               connection.flush();
             } else {
-               try {
-                  sessionSPI.commitTX(txID);
+               tx.commit();
+               synchronized (connection.getLock()) {
                   delivery.disposition(new Accepted());
-               } catch (ActiveMQAMQPException amqpE) {
-                  throw amqpE;
-               } catch (Exception e) {
-                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                }
-            }
-
-            // Replenish coordinator receiver credit on exhaustion so sender can continue
-            // transaction declare and discahrge operations.
-            if (receiver.getCredit() == 0) {
-               receiver.flow(DEFAULT_COORDINATOR_CREDIT);
+               connection.flush();
             }
          }
       } catch (ActiveMQAMQPException amqpE) {
-         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
-      } catch (Exception e) {
+         log.warn(amqpE.getMessage(), amqpE);
+         synchronized (connection.getLock()) {
+            delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+         }
+         connection.flush();
+      } catch (Throwable e) {
          log.warn(e.getMessage(), e);
-         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+         synchronized (connection.getLock()) {
+            delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
+         }
+         connection.flush();
       } finally {
-         delivery.settle();
-         buffer.release();
+         synchronized (connection.getLock()) {
+            delivery.settle();
+         }
+         connection.flush();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
index 9257c6b..4267b85 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/DeliveryUtil.java
@@ -16,28 +16,14 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.util;
 
-import io.netty.buffer.ByteBuf;
-import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 
 public class DeliveryUtil {
 
-   public static int readDelivery(Receiver receiver, ByteBuf buffer) {
-      int initial = buffer.writerIndex();
-      // optimization by norman
-      int count;
-      while ((count = receiver.recv(buffer.array(), buffer.arrayOffset() + buffer.writerIndex(), buffer.writableBytes())) > 0) {
-         // Increment the writer index by the number of bytes written into it while calling recv.
-         buffer.writerIndex(buffer.writerIndex() + count);
-         buffer.ensureWritable(count);
-      }
-      return buffer.writerIndex() - initial;
-   }
-
-   public static MessageImpl decodeMessageImpl(ByteBuf buffer) {
+   public static MessageImpl decodeMessageImpl(byte[] data) {
       MessageImpl message = (MessageImpl) Message.Factory.create();
-      message.decode(buffer.array(), buffer.arrayOffset() + buffer.readerIndex(), buffer.readableBytes());
+      message.decode(data, 0, data.length);
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1ef4dcf7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index c00cc1c..41bc5e7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,9 +17,20 @@
 
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -27,6 +38,8 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -788,4 +801,77 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
 
       connection.close();
    }
+
+   @Test(timeout = 120000)
+   public void testSendPersistentTX() throws Exception {
+      int MESSAGE_COUNT = 100000;
+      AtomicInteger errors = new AtomicInteger(0);
+      server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false, 1, false, true);
+      ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
+      Connection sendConnection = factory.createConnection();
+      Connection consumerConnection = factory.createConnection();
+      try {
+
+         Thread receiverThread = new Thread() {
+            @Override
+            public void run() {
+               try {
+                  consumerConnection.start();
+                  Session consumerSession = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+                  javax.jms.Queue q1 = consumerSession.createQueue("q1");
+
+                  MessageConsumer consumer = consumerSession.createConsumer(q1);
+
+                  for (int i = 1; i <= MESSAGE_COUNT; i++) {
+                     Message message = consumer.receive(5000);
+                     if (message == null) {
+                        throw new IOException("No message read in time.");
+                     }
+
+                     if (i % 100 == 0) {
+                        if (i % 1000 == 0) System.out.println("Read message " + i);
+                        consumerSession.commit();
+                     }
+                  }
+
+                  // Assure that all messages are consumed
+                  consumerSession.commit();
+               } catch (Exception e) {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+
+            }
+         };
+
+         receiverThread.start();
+
+         Session sendingSession = sendConnection.createSession(true, Session.SESSION_TRANSACTED);
+
+         javax.jms.Queue q1 = sendingSession.createQueue("q1");
+         MessageProducer producer = sendingSession.createProducer(q1);
+         producer.setDeliveryDelay(DeliveryMode.NON_PERSISTENT);
+         for (int i = 0; i < MESSAGE_COUNT; i++) {
+            producer.send(sendingSession.createTextMessage("message " + i), DeliveryMode.PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+            if (i % 100 == 0) {
+               if (i % 1000 == 0) System.out.println("Sending " + i);
+               sendingSession.commit();
+            }
+         }
+
+         sendingSession.commit();
+
+         receiverThread.join(50000);
+         Assert.assertFalse(receiverThread.isAlive());
+
+         Assert.assertEquals(0, errors.get());
+
+      } catch (Exception e) {
+         e.printStackTrace();
+      } finally {
+         sendConnection.close();
+         consumerConnection.close();
+      }
+
+   }
 }