You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/21 14:09:30 UTC

[activemq-artemis] 02/03: ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 25d0b511ce31387a1baa767873af76e9635b1bd3
Author: Michael Pearce <mi...@me.com>
AuthorDate: Wed Aug 21 08:19:32 2019 +0100

    ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  2 +-
 .../proton/transaction/ProtonTransactionImpl.java  |  2 +-
 .../apache/activemq/artemis/core/server/Queue.java |  6 +++
 .../artemis/core/server/ServerConsumer.java        |  2 +
 .../artemis/core/server/impl/QueueImpl.java        | 48 ++++++++++++++++-
 .../artemis/core/server/impl/RefsOperation.java    |  7 ++-
 .../core/server/impl/ServerConsumerImpl.java       |  9 +++-
 .../core/transaction/TransactionOperation.java     |  4 ++
 .../core/transaction/impl/TransactionImpl.java     | 62 +++++++++++-----------
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 10 ++++
 .../tests/integration/cli/DummyServerConsumer.java |  5 ++
 .../tests/integration/client/JMSOrderTest.java     | 50 +++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++
 13 files changed, 180 insertions(+), 37 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index dc249ff..4b2b669 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -363,7 +363,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    public void closeSender(final Object brokerConsumer) throws Exception {
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      consumer.close(false);
+      consumer.close(false, true);
       consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 123dbb5..83128e1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
    private boolean discharged;
 
    public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
-      super(xid, storageManager, timeoutSeconds);
+      super(xid, storageManager, timeoutSeconds, true);
       addOperation(new TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index bab38d6..8b91aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    ReferenceCounter getConsumersRefCount();
 
+   /* Called when a message is cancelled back into the queue */
+   void addSorted(List<MessageReference> refs, boolean scheduling);
+
    void reload(MessageReference ref);
 
    void addTail(MessageReference ref);
@@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void addHead(MessageReference ref, boolean scheduling);
 
+   /* Called when a message is cancelled back into the queue */
+   void addSorted(MessageReference ref, boolean scheduling);
+
    void addHead(List<MessageReference> refs, boolean scheduling);
 
    void acknowledge(MessageReference ref) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index f1f8b1e..0c9c5bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void close(boolean failed) throws Exception;
 
+   void close(boolean failed, boolean sorted) throws Exception;
+
    /**
     * This method is just to remove itself from Queues.
     * If for any reason during a close an exception occurred, the exception treatment
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e19d9ef..090a83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -896,6 +896,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    /* Called when a message is cancelled back into the queue */
    @Override
+   public void addSorted(final MessageReference ref, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+               return;
+            }
+
+            internalAddSorted(ref);
+
+            directDeliver = false;
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
+   }
+
+   /* Called when a message is cancelled back into the queue */
+   @Override
    public void addHead(final List<MessageReference> refs, boolean scheduling) {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
@@ -913,6 +932,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   /* Called when a message is cancelled back into the queue */
+   @Override
+   public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            for (MessageReference ref : refs) {
+               addSorted(ref, scheduling);
+            }
+
+            resetAllIterators();
+
+            deliverAsync();
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
+   }
+
    @Override
    public synchronized void reload(final MessageReference ref) {
       queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
@@ -3461,13 +3499,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    void postRollback(final LinkedList<MessageReference> refs) {
+      postRollback(refs, false);
+   }
+
+   void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
       //if we have purged then ignore adding the messages back
       if (purgeOnNoConsumers && getConsumerCount() == 0) {
          purgeAfterRollback(refs);
 
          return;
       }
-      addHead(refs, false);
+      if (sorted) {
+         addSorted(refs, false);
+      } else {
+         addHead(refs, false);
+      }
    }
 
    private void purgeAfterRollback(LinkedList<MessageReference> refs) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index c8d9297..925f439 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -79,6 +79,11 @@ public class RefsOperation extends TransactionOperationAbstract {
 
    @Override
    public void afterRollback(final Transaction tx) {
+      afterRollback(tx, false);
+   }
+
+   @Override
+   public void afterRollback(final Transaction tx, boolean sorted) {
       Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
 
       long timeBase = System.currentTimeMillis();
@@ -109,7 +114,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          QueueImpl queue = entry.getKey();
 
          synchronized (queue) {
-            queue.postRollback(refs);
+            queue.postRollback(refs, sorted);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 54cf9a2..ddba797 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -529,7 +529,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public synchronized void close(final boolean failed) throws Exception {
+   public void close(final boolean failed) throws Exception {
+      close(failed, false);
+   }
+
+   @Override
+   public synchronized void close(final boolean failed, boolean sorted) throws Exception {
 
       // Close should only ever be done once per consumer.
       if (isClosed) return;
@@ -555,7 +560,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       List<MessageReference> refs = cancelRefs(failed, false, null);
 
-      Transaction tx = new TransactionImpl(storageManager);
+      Transaction tx = new TransactionImpl(storageManager, sorted);
 
       refs.forEach(ref -> {
          if (logger.isTraceEnabled()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
index 5da1d97..5c7e7e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
@@ -52,6 +52,10 @@ public interface TransactionOperation {
     */
    void afterRollback(Transaction tx);
 
+   default void afterRollback(Transaction tx, boolean sorted) {
+      afterRollback(tx);
+   }
+
    List<MessageReference> getRelatedMessageReferences();
 
    List<MessageReference> getListOnConsumer(long consumerID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index d459975..95983b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -63,6 +63,8 @@ public class TransactionImpl implements Transaction {
 
    private final long createTime;
 
+   private final boolean sorted;
+
    private volatile boolean containsPersistent;
 
    private int timeoutSeconds = -1;
@@ -96,47 +98,45 @@ public class TransactionImpl implements Transaction {
    }
 
    public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
-      this.storageManager = storageManager;
-
-      xid = null;
-
-      id = storageManager.generateID();
-
-      createTime = System.currentTimeMillis();
-
-      this.timeoutSeconds = timeoutSeconds;
+      this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
    }
 
    public TransactionImpl(final StorageManager storageManager) {
-      this.storageManager = storageManager;
-
-      xid = null;
-
-      id = storageManager.generateID();
+      this(storageManager, false);
+   }
 
-      createTime = System.currentTimeMillis();
+   public TransactionImpl(final StorageManager storageManager, boolean sorted) {
+      this(storageManager.generateID(), null, storageManager,-1, sorted);
    }
 
    public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
-      this.storageManager = storageManager;
-
-      this.xid = xid;
+      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
+   }
 
-      id = storageManager.generateID();
+   public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
+      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
+   }
 
-      createTime = System.currentTimeMillis();
+   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+      this(id, xid, storageManager, -1, false);
+   }
 
-      this.timeoutSeconds = timeoutSeconds;
+   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
+      this(id, xid, storageManager, -1, sorted);
    }
 
-   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+   private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
       this.storageManager = storageManager;
 
       this.xid = xid;
 
       this.id = id;
 
-      createTime = System.currentTimeMillis();
+      this.createTime = System.currentTimeMillis();
+
+      this.timeoutSeconds = timeoutSeconds;
+
+      this.sorted = sorted;
    }
 
    // Transaction implementation
@@ -217,7 +217,7 @@ public class TransactionImpl implements Transaction {
                   logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
                }
 
-               internalRollback();
+               internalRollback(sorted);
 
                if (exception != null) {
                   throw exception;
@@ -276,7 +276,7 @@ public class TransactionImpl implements Transaction {
             return;
          }
          if (state == State.ROLLBACK_ONLY) {
-            internalRollback();
+            internalRollback(sorted);
 
             if (exception != null) {
                throw exception;
@@ -379,11 +379,11 @@ public class TransactionImpl implements Transaction {
             }
          }
 
-         internalRollback();
+         internalRollback(sorted);
       }
    }
 
-   private void internalRollback() throws Exception {
+   private void internalRollback(boolean sorted) throws Exception {
       if (logger.isTraceEnabled()) {
          logger.trace("TransactionImpl::internalRollback " + this);
       }
@@ -418,7 +418,7 @@ public class TransactionImpl implements Transaction {
 
          @Override
          public void done() {
-            afterRollback(operationsToComplete);
+            afterRollback(operationsToComplete, sorted);
          }
       });
 
@@ -432,7 +432,7 @@ public class TransactionImpl implements Transaction {
 
             @Override
             public void done() {
-               afterRollback(storeOperationsToComplete);
+               afterRollback(storeOperationsToComplete, sorted);
             }
          });
       }
@@ -562,10 +562,10 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
+   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
       if (operationsToComplete != null) {
          for (TransactionOperation operation : operationsToComplete) {
-            operation.afterRollback(this);
+            operation.afterRollback(this, sorted);
          }
          // Help out GC here
          operationsToComplete.clear();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ea15264..b94bd3a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1036,6 +1036,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void addSorted(List<MessageReference> refs, boolean scheduling) {
+         addHead(refs, scheduling);
+      }
+
+      @Override
       public void reload(MessageReference ref) {
 
       }
@@ -1056,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void addSorted(MessageReference ref, boolean scheduling) {
+
+      }
+
+      @Override
       public void addHead(List<MessageReference> refs, boolean scheduling) {
          for (MessageReference ref : refs) {
             addFirst(ref);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index ee7bdbb..9858357 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -91,6 +91,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public void close(boolean failed, boolean sorted) throws Exception {
+
+   }
+
+   @Override
    public void removeItself() throws Exception {
 
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index dcc5a40..f4087d4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -129,4 +129,54 @@ public class JMSOrderTest extends JMSTestBase {
 
    }
 
+   @Test(timeout = 60000)
+   public void testReceiveSomeThenClose() throws Exception {
+      Connection connection = protocolCF.createConnection();
+      try {
+         connection.start();
+
+         int totalCount = 5;
+         int consumeBeforeRollback = 2;
+
+         sendToAmqQueue(totalCount);
+
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(name.getMethodName());
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 1; i <= consumeBeforeRollback; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull(message);
+            assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
+         }
+
+         session.close();
+
+         session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         queue = session.createQueue(name.getMethodName());
+         consumer = session.createConsumer(queue);
+
+         // Consume again.. the previously consumed messages should get delivered
+         // again after the rollback and then the remainder should follow
+         List<Integer> messageNumbers = new ArrayList<>();
+         for (int i = 1; i <= totalCount; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull("Failed to receive message: " + i, message);
+            int msgNum = message.getIntProperty("nr");
+            System.out.println("Received " + msgNum);
+            messageNumbers.add(msgNum);
+         }
+
+         session.commit();
+
+         assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
+         for (int i = 0; i < messageNumbers.size(); i++) {
+            assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
+         }
+      } finally {
+         connection.close();
+      }
+
+   }
+
 }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 612d621..da25078 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -269,6 +269,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void addSorted(MessageReference ref, boolean scheduling) {
+
+   }
+
+   @Override
    public void addHead(List<MessageReference> ref, boolean scheduling) {
       // no-op
 
@@ -460,6 +465,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void addSorted(List<MessageReference> refs, boolean scheduling) {
+
+   }
+
+   @Override
    public Set<Consumer> getConsumers() {
       // no-op
       return null;