You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/21 14:09:28 UTC

[activemq-artemis] branch master updated (6fc1133 -> 17f8675)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git.


    from 6fc1133  This closes #2800
     new 61eb379  ARTEMIS-2458 Fix AMQP Transaction Rollback Ordering by using a sorted add
     new 25d0b51  ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering
     new 17f8675  This closes #2807

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../artemis/utils/collections/LinkedListImpl.java  |  62 +++++++
 .../utils/collections/PriorityLinkedList.java      |   2 +
 .../utils/collections/PriorityLinkedListImpl.java  |  17 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +-
 .../proton/transaction/ProtonTransactionImpl.java  |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |   2 +-
 .../apache/activemq/artemis/core/server/Queue.java |   9 +-
 .../artemis/core/server/ServerConsumer.java        |   4 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/impl/MessageReferenceImpl.java     |  23 +++
 .../artemis/core/server/impl/QueueImpl.java        |  75 ++++++++-
 .../artemis/core/server/impl/RefsOperation.java    |   7 +-
 .../core/server/impl/ServerConsumerImpl.java       |  13 +-
 .../core/server/impl/ServerSessionImpl.java        |   2 +-
 .../core/transaction/TransactionOperation.java     |   4 +
 .../core/transaction/impl/TransactionImpl.java     |  62 +++----
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  12 +-
 .../tests/integration/cli/DummyServerConsumer.java |   7 +-
 .../tests/integration/client/JMSOrderTest.java     | 182 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  12 +-
 .../artemis/tests/unit/util/LinkedListTest.java    |  73 ++++++++-
 21 files changed, 523 insertions(+), 53 deletions(-)
 create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java


[activemq-artemis] 01/03: ARTEMIS-2458 Fix AMQP Transaction Rollback Ordering by using a sorted add

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 61eb379741ab3faebdffa4d7347507e8bd11d890
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Aug 20 17:05:33 2019 -0400

    ARTEMIS-2458 Fix AMQP Transaction Rollback Ordering by using a sorted add
---
 .../artemis/utils/collections/LinkedListImpl.java  |  62 ++++++++++
 .../utils/collections/PriorityLinkedList.java      |   2 +
 .../utils/collections/PriorityLinkedListImpl.java  |  17 ++-
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |   2 +-
 .../apache/activemq/artemis/core/server/Queue.java |   3 +-
 .../artemis/core/server/ServerConsumer.java        |   2 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/impl/MessageReferenceImpl.java     |  23 ++++
 .../artemis/core/server/impl/QueueImpl.java        |  27 ++++-
 .../core/server/impl/ServerConsumerImpl.java       |   4 +-
 .../core/server/impl/ServerSessionImpl.java        |   2 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   2 +-
 .../tests/integration/cli/DummyServerConsumer.java |   2 +-
 .../tests/integration/client/JMSOrderTest.java     | 132 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   2 +-
 .../artemis/tests/unit/util/LinkedListTest.java    |  73 +++++++++++-
 17 files changed, 343 insertions(+), 16 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index cb20258..7426947 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.utils.collections;
 
 import java.lang.reflect.Array;
+import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
@@ -43,8 +44,15 @@ public class LinkedListImpl<E> implements LinkedList<E> {
 
    private int nextIndex;
 
+   private final Comparator<E> comparator;
+
    public LinkedListImpl() {
+      this(null);
+   }
+
+   public LinkedListImpl(Comparator<E> comparator) {
       iters = createIteratorArray(INITIAL_ITERATOR_ARRAY_SIZE);
+      this.comparator = comparator;
    }
 
    @Override
@@ -84,6 +92,60 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
+   public void addSorted(E e) {
+      if (comparator == null) {
+         throw new NullPointerException("comparator=null");
+      }
+      if (size == 0) {
+         addHead(e);
+      } else {
+         if (comparator.compare(head.next.val(), e) < 0) {
+            addHead(e);
+            return;
+         }
+
+         // in our usage, most of the times we will just add to the end
+         // as the QueueImpl cancellations in AMQP will return the buffer back to the queue, in the order they were consumed.
+         // There is an exception to that case, when there are more messages on the queue.
+         // This would be an optimization for our usage.
+         // avoiding scanning the entire List just to add at the end, so we compare the end first.
+         if (comparator.compare(tail.val(), e) >= 0) {
+            addTail(e);
+            return;
+         }
+
+         Node<E> fetching = head.next;
+         while (fetching.next != null) {
+            int compareNext = comparator.compare(fetching.next.val(), e);
+            if (compareNext <= 0) {
+               addAfter(fetching, e);
+               return;
+            }
+            fetching = fetching.next;
+         }
+
+         // this shouldn't happen as the tail was compared before iterating
+         // the only possibilities for this to happen are:
+         // - there is a bug on the comparator
+         // - This method is buggy
+         // - The list wasn't properly synchronized as this list does't support concurrent access
+         //
+         // Also I'm not bothering about creating a Logger ID for this, because the only reason for this code to exist
+         //      is because my OCD level is not letting this out.
+         throw new IllegalStateException("Cannot find a suitable place for your element, There's a mismatch in the comparator or there was concurrent adccess on the queue");
+      }
+   }
+
+   private void addAfter(Node<E> node, E e) {
+      Node<E> newNode = Node.with(e);
+      Node<E> nextNode = node.next;
+      node.next = newNode;
+      newNode.prev = node;
+      newNode.next = nextNode;
+      nextNode.prev = newNode;
+      size++;
+   }
+
    @Override
    public E poll() {
       Node<E> ret = head.next;
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
index 19e58c2..9437f55 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java
@@ -27,6 +27,8 @@ public interface PriorityLinkedList<T> {
 
    void addTail(T t, int priority);
 
+   void addSorted(T t, int priority);
+
    T poll();
 
    void clear();
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
index 00cf046..bde9461 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.utils.collections;
 
 import java.lang.reflect.Array;
+import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -40,10 +41,15 @@ public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
    private int lastPriority = -1;
 
    public PriorityLinkedListImpl(final int priorities) {
+      this(priorities, null);
+   }
+
+
+   public PriorityLinkedListImpl(final int priorities, Comparator<T> comparator) {
       levels = (LinkedListImpl<T>[]) Array.newInstance(LinkedListImpl.class, priorities);
 
       for (int i = 0; i < priorities; i++) {
-         levels[i] = new LinkedListImpl<>();
+         levels[i] = new LinkedListImpl<>(comparator);
       }
    }
 
@@ -81,6 +87,15 @@ public class PriorityLinkedListImpl<T> implements PriorityLinkedList<T> {
    }
 
    @Override
+   public void addSorted(T t, int priority) {
+      checkHighest(priority);
+
+      levels[priority].addSorted(t);
+
+      exclusiveIncrementSize(1);
+   }
+
+   @Override
    public T poll() {
       T t = null;
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index f850cc1..dc249ff 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -405,7 +405,7 @@ public class AMQPSessionCallback implements SessionCallback {
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
       OperationContext oldContext = recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
+         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts, true);
          ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
          resetContext(oldContext);
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index abcfe3f..b473a87 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -131,7 +131,7 @@ public class MQTTPublishManager {
             sendServerMessage(mqttid, message, deliveryCount, qos);
          } else {
             // Client must have disconnected and it's Subscription QoS cleared
-            consumer.individualCancel(message.getMessageID(), false);
+            consumer.individualCancel(message.getMessageID(), false, true);
          }
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2d7f373..bab38d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -172,7 +172,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void cancel(Transaction tx, MessageReference ref, boolean ignoreRedeliveryCheck);
 
-   void cancel(MessageReference reference, long timeBase) throws Exception;
+   /** @param sorted it should use the messageID as a reference to where to add it in the queue */
+   void cancel(MessageReference reference, long timeBase, boolean sorted) throws Exception;
 
    void deliverAsync();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 4d35919..f1f8b1e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -98,7 +98,7 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void reject(long messageID) throws Exception;
 
-   void individualCancel(long messageID, boolean failed) throws Exception;
+   void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception;
 
    void forceDelivery(long sequence);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 7d5bbe8..1024118 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -354,7 +354,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
          refqueue = ref.getQueue();
 
          try {
-            refqueue.cancel(ref, timeBase);
+            refqueue.cancel(ref, timeBase, false);
          } catch (Exception e) {
             // There isn't much we can do besides log an error
             ActiveMQServerLogger.LOGGER.errorCancellingRefOnBridge(e, ref);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 97bb7f6..459b703 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.util.Comparator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Consumer;
 
@@ -33,6 +34,28 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
  */
 public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference, Runnable {
 
+   private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
+
+   public static Comparator<MessageReference> getIDComparator() {
+      return idComparator;
+   }
+
+   private static class MessageReferenceComparatorByID implements Comparator<MessageReference> {
+
+      @Override
+      public int compare(MessageReference o1, MessageReference o2) {
+         long value = o2.getMessage().getMessageID() - o1.getMessage().getMessageID();
+         if (value > 0) {
+            return 1;
+         } else if (value < 0) {
+            return -1;
+         } else {
+            return 0;
+         }
+      }
+   }
+
+
    private static final AtomicIntegerFieldUpdater<MessageReferenceImpl> DELIVERY_COUNT_UPDATER = AtomicIntegerFieldUpdater
       .newUpdater(MessageReferenceImpl.class, "deliveryCount");
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 1c133ef..e19d9ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
 
    // This is where messages are stored
-   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES);
+   private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
 
    // The quantity of pagedReferences on messageReferences priority list
    private final AtomicInteger pagedReferences = new AtomicInteger(0);
@@ -1631,11 +1631,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
+   public synchronized void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
       Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
       if (redeliveryResult.getA()) {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
-            internalAddHead(reference);
+            if (sorted) {
+               internalAddSorted(reference);
+            } else {
+               internalAddHead(reference);
+            }
          }
 
          resetAllIterators();
@@ -2469,6 +2473,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       messageReferences.addHead(ref, priority);
    }
 
+   /**
+    * The caller of this method requires synchronized on the queue.
+    * I'm not going to add synchronized to this method just for a precaution,
+    * as I'm not 100% sure this won't cause any extra runtime.
+    *
+    * @param ref
+    */
+   private void internalAddSorted(final MessageReference ref) {
+      queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
+      pendingMetrics.incrementMetrics(ref);
+      refAdded(ref);
+
+      int priority = getPriority(ref);
+
+      messageReferences.addSorted(ref, priority);
+   }
+
    private int getPriority(MessageReference ref) {
       try {
          return ref.getMessage().getPriority();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 37dd74a..54cf9a2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -992,7 +992,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
+   public synchronized void individualCancel(final long messageID, boolean failed, boolean sorted) throws Exception {
       if (browseOnly) {
          return;
       }
@@ -1007,7 +1007,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          ref.decrementDeliveryCount();
       }
 
-      ref.getQueue().cancel(ref, System.currentTimeMillis());
+      ref.getQueue().cancel(ref, System.currentTimeMillis(), sorted);
    }
 
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 9044302..296382b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1146,7 +1146,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       ServerConsumer consumer = locateConsumer(consumerID);
 
       if (consumer != null) {
-         consumer.individualCancel(messageID, failed);
+         consumer.individualCancel(messageID, failed, false);
       }
 
    }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ad4cbb3..ea15264 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1108,7 +1108,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void cancel(MessageReference reference, long timeBase) throws Exception {
+      public void cancel(MessageReference reference, long timeBase, boolean backInPlace) throws Exception {
 
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 58bf2d3..ee7bdbb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -150,7 +150,7 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void individualCancel(long messageID, boolean failed) throws Exception {
+   public void individualCancel(long messageID, boolean failed, boolean sorted) throws Exception {
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
new file mode 100644
index 0000000..dcc5a40
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
+
+@RunWith(value = Parameterized.class)
+public class JMSOrderTest extends JMSTestBase {
+
+   String protocol;
+
+   ConnectionFactory protocolCF;
+
+   public JMSOrderTest(String protocol) {
+      this.protocol = protocol;
+   }
+
+   @Before
+   public void setupCF() {
+      protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
+   }
+
+   @Parameterized.Parameters(name = "protocol={0}")
+   public static Collection getParameters() {
+      return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}});
+   }
+
+   protected void sendToAmqQueue(int count) throws Exception {
+      Connection activemqConnection = protocolCF.createConnection();
+      Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Queue amqTestQueue = amqSession.createQueue(name.getMethodName());
+      sendMessages(activemqConnection, amqTestQueue, count);
+      activemqConnection.close();
+   }
+
+   public void sendMessages(Connection connection, Destination destination, int count) throws Exception {
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer p = session.createProducer(destination);
+
+      for (int i = 1; i <= count; i++) {
+         TextMessage message = session.createTextMessage();
+         message.setText("TextMessage: " + i);
+         message.setIntProperty("nr", i);
+         p.send(message);
+      }
+
+      session.close();
+
+   }
+
+   @Test(timeout = 60000)
+   public void testReceiveSomeThenRollback() throws Exception {
+      Connection connection = protocolCF.createConnection();
+      try {
+         connection.start();
+
+         int totalCount = 5;
+         int consumeBeforeRollback = 2;
+
+         sendToAmqQueue(totalCount);
+
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(name.getMethodName());
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 1; i <= consumeBeforeRollback; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull(message);
+            assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
+         }
+
+         session.rollback();
+
+         // Consume again.. the previously consumed messages should get delivered
+         // again after the rollback and then the remainder should follow
+         List<Integer> messageNumbers = new ArrayList<>();
+         for (int i = 1; i <= totalCount; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull("Failed to receive message: " + i, message);
+            int msgNum = message.getIntProperty("nr");
+            System.out.println("Received " + msgNum);
+            messageNumbers.add(msgNum);
+         }
+
+         session.commit();
+
+         assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
+         for (int i = 0; i < messageNumbers.size(); i++) {
+            assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
+         }
+      } finally {
+         connection.close();
+      }
+
+   }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 290aa15..612d621 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -351,7 +351,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void cancel(final MessageReference reference, final long timeBase) throws Exception {
+   public void cancel(final MessageReference reference, final long timeBase, boolean sorted) throws Exception {
       // no-op
 
    }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index e7c3eba..5d8f21b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -17,14 +17,18 @@
 package org.apache.activemq.artemis.tests.unit.util;
 
 import java.lang.ref.WeakReference;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -37,7 +41,74 @@ public class LinkedListTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      list = new LinkedListImpl<>();
+      list = new LinkedListImpl<>(integerComparator);
+   }
+
+   Comparator<Integer> integerComparator = new Comparator<Integer>() {
+      @Override
+      public int compare(Integer o1, Integer o2) {
+         if (o1.intValue() == o2.intValue()) {
+            return 0;
+         }
+         if (o2.intValue() > o1.intValue()) {
+            return 1;
+         } else {
+            return -1;
+         }
+      }
+   };
+
+   @Test
+   public void addSorted() {
+
+      list.addSorted(1);
+      list.addSorted(3);
+      list.addSorted(2);
+      list.addSorted(0);
+      validateOrder(null);
+      Assert.assertEquals(4, list.size());
+
+   }
+
+
+   @Test
+   public void randomSorted() {
+
+      HashSet<Integer> values = new HashSet<>();
+      for (int i = 0; i < 1000; i++) {
+
+         int value = RandomUtil.randomInt();
+         if (!values.contains(value)) {
+            values.add(value);
+            list.addSorted(value);
+         }
+      }
+
+      Assert.assertEquals(values.size(), list.size());
+
+      validateOrder(values);
+
+      Assert.assertEquals(0, values.size());
+
+   }
+
+   private void validateOrder(HashSet<Integer> values) {
+      Integer previous = null;
+      LinkedListIterator<Integer> integerIterator = list.iterator();
+      while (integerIterator.hasNext()) {
+
+         Integer value = integerIterator.next();
+         if (previous != null) {
+            Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
+            Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());
+         }
+
+         if (values != null) {
+            values.remove(value);
+         }
+         previous = value;
+      }
+      integerIterator.close();
    }
 
    @Test


[activemq-artemis] 02/03: ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 25d0b511ce31387a1baa767873af76e9635b1bd3
Author: Michael Pearce <mi...@me.com>
AuthorDate: Wed Aug 21 08:19:32 2019 +0100

    ARTEMIS-2458 Fix AMQP Transaction Session Close Ordering
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  2 +-
 .../proton/transaction/ProtonTransactionImpl.java  |  2 +-
 .../apache/activemq/artemis/core/server/Queue.java |  6 +++
 .../artemis/core/server/ServerConsumer.java        |  2 +
 .../artemis/core/server/impl/QueueImpl.java        | 48 ++++++++++++++++-
 .../artemis/core/server/impl/RefsOperation.java    |  7 ++-
 .../core/server/impl/ServerConsumerImpl.java       |  9 +++-
 .../core/transaction/TransactionOperation.java     |  4 ++
 .../core/transaction/impl/TransactionImpl.java     | 62 +++++++++++-----------
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 10 ++++
 .../tests/integration/cli/DummyServerConsumer.java |  5 ++
 .../tests/integration/client/JMSOrderTest.java     | 50 +++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++
 13 files changed, 180 insertions(+), 37 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index dc249ff..4b2b669 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -363,7 +363,7 @@ public class AMQPSessionCallback implements SessionCallback {
 
    public void closeSender(final Object brokerConsumer) throws Exception {
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      consumer.close(false);
+      consumer.close(false, true);
       consumer.getQueue().recheckRefCount(serverSession.getSessionContext());
    }
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 123dbb5..83128e1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -50,7 +50,7 @@ public class ProtonTransactionImpl extends TransactionImpl {
    private boolean discharged;
 
    public ProtonTransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final AMQPConnectionContext connection) {
-      super(xid, storageManager, timeoutSeconds);
+      super(xid, storageManager, timeoutSeconds, true);
       addOperation(new TransactionOperationAbstract() {
          @Override
          public void afterCommit(Transaction tx) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index bab38d6..8b91aa9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -146,6 +146,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    ReferenceCounter getConsumersRefCount();
 
+   /* Called when a message is cancelled back into the queue */
+   void addSorted(List<MessageReference> refs, boolean scheduling);
+
    void reload(MessageReference ref);
 
    void addTail(MessageReference ref);
@@ -154,6 +157,9 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void addHead(MessageReference ref, boolean scheduling);
 
+   /* Called when a message is cancelled back into the queue */
+   void addSorted(MessageReference ref, boolean scheduling);
+
    void addHead(List<MessageReference> refs, boolean scheduling);
 
    void acknowledge(MessageReference ref) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index f1f8b1e..0c9c5bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -63,6 +63,8 @@ public interface ServerConsumer extends Consumer, ConsumerInfo {
 
    void close(boolean failed) throws Exception;
 
+   void close(boolean failed, boolean sorted) throws Exception;
+
    /**
     * This method is just to remove itself from Queues.
     * If for any reason during a close an exception occurred, the exception treatment
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e19d9ef..090a83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -896,6 +896,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    /* Called when a message is cancelled back into the queue */
    @Override
+   public void addSorted(final MessageReference ref, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+               return;
+            }
+
+            internalAddSorted(ref);
+
+            directDeliver = false;
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
+   }
+
+   /* Called when a message is cancelled back into the queue */
+   @Override
    public void addHead(final List<MessageReference> refs, boolean scheduling) {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
@@ -913,6 +932,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   /* Called when a message is cancelled back into the queue */
+   @Override
+   public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+      enterCritical(CRITICAL_PATH_ADD_HEAD);
+      synchronized (this) {
+         try {
+            for (MessageReference ref : refs) {
+               addSorted(ref, scheduling);
+            }
+
+            resetAllIterators();
+
+            deliverAsync();
+         } finally {
+            leaveCritical(CRITICAL_PATH_ADD_HEAD);
+         }
+      }
+   }
+
    @Override
    public synchronized void reload(final MessageReference ref) {
       queueMemorySize.addAndGet(ref.getMessageMemoryEstimate());
@@ -3461,13 +3499,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    void postRollback(final LinkedList<MessageReference> refs) {
+      postRollback(refs, false);
+   }
+
+   void postRollback(final LinkedList<MessageReference> refs, boolean sorted) {
       //if we have purged then ignore adding the messages back
       if (purgeOnNoConsumers && getConsumerCount() == 0) {
          purgeAfterRollback(refs);
 
          return;
       }
-      addHead(refs, false);
+      if (sorted) {
+         addSorted(refs, false);
+      } else {
+         addHead(refs, false);
+      }
    }
 
    private void purgeAfterRollback(LinkedList<MessageReference> refs) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index c8d9297..925f439 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -79,6 +79,11 @@ public class RefsOperation extends TransactionOperationAbstract {
 
    @Override
    public void afterRollback(final Transaction tx) {
+      afterRollback(tx, false);
+   }
+
+   @Override
+   public void afterRollback(final Transaction tx, boolean sorted) {
       Map<QueueImpl, LinkedList<MessageReference>> queueMap = new HashMap<>();
 
       long timeBase = System.currentTimeMillis();
@@ -109,7 +114,7 @@ public class RefsOperation extends TransactionOperationAbstract {
          QueueImpl queue = entry.getKey();
 
          synchronized (queue) {
-            queue.postRollback(refs);
+            queue.postRollback(refs, sorted);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 54cf9a2..ddba797 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -529,7 +529,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public synchronized void close(final boolean failed) throws Exception {
+   public void close(final boolean failed) throws Exception {
+      close(failed, false);
+   }
+
+   @Override
+   public synchronized void close(final boolean failed, boolean sorted) throws Exception {
 
       // Close should only ever be done once per consumer.
       if (isClosed) return;
@@ -555,7 +560,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
 
       List<MessageReference> refs = cancelRefs(failed, false, null);
 
-      Transaction tx = new TransactionImpl(storageManager);
+      Transaction tx = new TransactionImpl(storageManager, sorted);
 
       refs.forEach(ref -> {
          if (logger.isTraceEnabled()) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
index 5da1d97..5c7e7e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/TransactionOperation.java
@@ -52,6 +52,10 @@ public interface TransactionOperation {
     */
    void afterRollback(Transaction tx);
 
+   default void afterRollback(Transaction tx, boolean sorted) {
+      afterRollback(tx);
+   }
+
    List<MessageReference> getRelatedMessageReferences();
 
    List<MessageReference> getListOnConsumer(long consumerID);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index d459975..95983b7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -63,6 +63,8 @@ public class TransactionImpl implements Transaction {
 
    private final long createTime;
 
+   private final boolean sorted;
+
    private volatile boolean containsPersistent;
 
    private int timeoutSeconds = -1;
@@ -96,47 +98,45 @@ public class TransactionImpl implements Transaction {
    }
 
    public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds) {
-      this.storageManager = storageManager;
-
-      xid = null;
-
-      id = storageManager.generateID();
-
-      createTime = System.currentTimeMillis();
-
-      this.timeoutSeconds = timeoutSeconds;
+      this(storageManager.generateID(), null, storageManager, timeoutSeconds, false);
    }
 
    public TransactionImpl(final StorageManager storageManager) {
-      this.storageManager = storageManager;
-
-      xid = null;
-
-      id = storageManager.generateID();
+      this(storageManager, false);
+   }
 
-      createTime = System.currentTimeMillis();
+   public TransactionImpl(final StorageManager storageManager, boolean sorted) {
+      this(storageManager.generateID(), null, storageManager,-1, sorted);
    }
 
    public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds) {
-      this.storageManager = storageManager;
-
-      this.xid = xid;
+      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, false);
+   }
 
-      id = storageManager.generateID();
+   public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds, final boolean sorted) {
+      this(storageManager.generateID(), xid, storageManager, timeoutSeconds, sorted);
+   }
 
-      createTime = System.currentTimeMillis();
+   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+      this(id, xid, storageManager, -1, false);
+   }
 
-      this.timeoutSeconds = timeoutSeconds;
+   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, boolean sorted) {
+      this(id, xid, storageManager, -1, sorted);
    }
 
-   public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager) {
+   private TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final int timeoutSeconds, boolean sorted) {
       this.storageManager = storageManager;
 
       this.xid = xid;
 
       this.id = id;
 
-      createTime = System.currentTimeMillis();
+      this.createTime = System.currentTimeMillis();
+
+      this.timeoutSeconds = timeoutSeconds;
+
+      this.sorted = sorted;
    }
 
    // Transaction implementation
@@ -217,7 +217,7 @@ public class TransactionImpl implements Transaction {
                   logger.trace("TransactionImpl::prepare::rollbackonly, rollingback " + this);
                }
 
-               internalRollback();
+               internalRollback(sorted);
 
                if (exception != null) {
                   throw exception;
@@ -276,7 +276,7 @@ public class TransactionImpl implements Transaction {
             return;
          }
          if (state == State.ROLLBACK_ONLY) {
-            internalRollback();
+            internalRollback(sorted);
 
             if (exception != null) {
                throw exception;
@@ -379,11 +379,11 @@ public class TransactionImpl implements Transaction {
             }
          }
 
-         internalRollback();
+         internalRollback(sorted);
       }
    }
 
-   private void internalRollback() throws Exception {
+   private void internalRollback(boolean sorted) throws Exception {
       if (logger.isTraceEnabled()) {
          logger.trace("TransactionImpl::internalRollback " + this);
       }
@@ -418,7 +418,7 @@ public class TransactionImpl implements Transaction {
 
          @Override
          public void done() {
-            afterRollback(operationsToComplete);
+            afterRollback(operationsToComplete, sorted);
          }
       });
 
@@ -432,7 +432,7 @@ public class TransactionImpl implements Transaction {
 
             @Override
             public void done() {
-               afterRollback(storeOperationsToComplete);
+               afterRollback(storeOperationsToComplete, sorted);
             }
          });
       }
@@ -562,10 +562,10 @@ public class TransactionImpl implements Transaction {
       }
    }
 
-   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete) {
+   private synchronized void afterRollback(List<TransactionOperation> operationsToComplete, boolean sorted) {
       if (operationsToComplete != null) {
          for (TransactionOperation operation : operationsToComplete) {
-            operation.afterRollback(this);
+            operation.afterRollback(this, sorted);
          }
          // Help out GC here
          operationsToComplete.clear();
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index ea15264..b94bd3a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1036,6 +1036,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void addSorted(List<MessageReference> refs, boolean scheduling) {
+         addHead(refs, scheduling);
+      }
+
+      @Override
       public void reload(MessageReference ref) {
 
       }
@@ -1056,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void addSorted(MessageReference ref, boolean scheduling) {
+
+      }
+
+      @Override
       public void addHead(List<MessageReference> refs, boolean scheduling) {
          for (MessageReference ref : refs) {
             addFirst(ref);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index ee7bdbb..9858357 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -91,6 +91,11 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
+   public void close(boolean failed, boolean sorted) throws Exception {
+
+   }
+
+   @Override
    public void removeItself() throws Exception {
 
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
index dcc5a40..f4087d4 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSOrderTest.java
@@ -129,4 +129,54 @@ public class JMSOrderTest extends JMSTestBase {
 
    }
 
+   @Test(timeout = 60000)
+   public void testReceiveSomeThenClose() throws Exception {
+      Connection connection = protocolCF.createConnection();
+      try {
+         connection.start();
+
+         int totalCount = 5;
+         int consumeBeforeRollback = 2;
+
+         sendToAmqQueue(totalCount);
+
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue = session.createQueue(name.getMethodName());
+         MessageConsumer consumer = session.createConsumer(queue);
+
+         for (int i = 1; i <= consumeBeforeRollback; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull(message);
+            assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
+         }
+
+         session.close();
+
+         session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         queue = session.createQueue(name.getMethodName());
+         consumer = session.createConsumer(queue);
+
+         // Consume again.. the previously consumed messages should get delivered
+         // again after the rollback and then the remainder should follow
+         List<Integer> messageNumbers = new ArrayList<>();
+         for (int i = 1; i <= totalCount; i++) {
+            Message message = consumer.receive(3000);
+            assertNotNull("Failed to receive message: " + i, message);
+            int msgNum = message.getIntProperty("nr");
+            System.out.println("Received " + msgNum);
+            messageNumbers.add(msgNum);
+         }
+
+         session.commit();
+
+         assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
+         for (int i = 0; i < messageNumbers.size(); i++) {
+            assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
+         }
+      } finally {
+         connection.close();
+      }
+
+   }
+
 }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 612d621..da25078 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -269,6 +269,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void addSorted(MessageReference ref, boolean scheduling) {
+
+   }
+
+   @Override
    public void addHead(List<MessageReference> ref, boolean scheduling) {
       // no-op
 
@@ -460,6 +465,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void addSorted(List<MessageReference> refs, boolean scheduling) {
+
+   }
+
+   @Override
    public Set<Consumer> getConsumers() {
       // no-op
       return null;


[activemq-artemis] 03/03: This closes #2807

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 17f867596465c575db4764136a099cf49127808e
Merge: 6fc1133 25d0b51
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Aug 21 10:09:15 2019 -0400

    This closes #2807

 .../artemis/utils/collections/LinkedListImpl.java  |  62 +++++++
 .../utils/collections/PriorityLinkedList.java      |   2 +
 .../utils/collections/PriorityLinkedListImpl.java  |  17 +-
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +-
 .../proton/transaction/ProtonTransactionImpl.java  |   2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     |   2 +-
 .../apache/activemq/artemis/core/server/Queue.java |   9 +-
 .../artemis/core/server/ServerConsumer.java        |   4 +-
 .../core/server/cluster/impl/BridgeImpl.java       |   2 +-
 .../core/server/impl/MessageReferenceImpl.java     |  23 +++
 .../artemis/core/server/impl/QueueImpl.java        |  75 ++++++++-
 .../artemis/core/server/impl/RefsOperation.java    |   7 +-
 .../core/server/impl/ServerConsumerImpl.java       |  13 +-
 .../core/server/impl/ServerSessionImpl.java        |   2 +-
 .../core/transaction/TransactionOperation.java     |   4 +
 .../core/transaction/impl/TransactionImpl.java     |  62 +++----
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  12 +-
 .../tests/integration/cli/DummyServerConsumer.java |   7 +-
 .../tests/integration/client/JMSOrderTest.java     | 182 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  12 +-
 .../artemis/tests/unit/util/LinkedListTest.java    |  73 ++++++++-
 21 files changed, 523 insertions(+), 53 deletions(-)