You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:44 UTC

[15/34] activemq-artemis git commit: ARTEMIS-828 Queue browsing can be out of sync while paging

ARTEMIS-828 Queue browsing can be out of sync while paging

https://issues.apache.org/jira/browse/ARTEMIS-828


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfb9bedb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfb9bedb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfb9bedb

Branch: refs/heads/ARTEMIS-780
Commit: bfb9bedb2d7a3d8ab5e336509915eb6ecafaefb3
Parents: e002125
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 27 17:30:34 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:58 2016 -0400

----------------------------------------------------------------------
 .../core/management/impl/QueueControlImpl.java  |  8 +--
 .../core/paging/cursor/PageSubscription.java    |  5 +-
 .../impl/PageSubscriptionCounterImpl.java       |  1 -
 .../cursor/impl/PageSubscriptionImpl.java       | 21 ++++--
 .../activemq/artemis/core/server/Queue.java     |  2 +-
 .../artemis/core/server/impl/QueueImpl.java     | 67 ++++++++++++++------
 .../core/server/impl/ScaleDownHandler.java      |  4 +-
 .../core/server/impl/ServerConsumerImpl.java    |  2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  2 +-
 .../integration/paging/PagingSendTest.java      |  4 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |  2 +-
 .../unit/core/server/impl/QueueImplTest.java    |  2 +-
 12 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cfa8aa5..85bad25 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -410,7 +410,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          Filter filter = FilterImpl.createFilter(filterStr);
          List<Map<String, Object>> messages = new ArrayList<>();
          queue.flushExecutor();
-         try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             while (iterator.hasNext()) {
                MessageReference ref = iterator.next();
                if (filter == null || filter.match(ref.getMessage())) {
@@ -446,7 +446,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
       try {
          List<Map<String, Object>> messages = new ArrayList<>();
          queue.flushExecutor();
-         try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             // returns just the first, as it's the first only
             if (iterator.hasNext()) {
                MessageReference ref = iterator.next();
@@ -499,7 +499,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          if (filter == null) {
             return getMessageCount();
          } else {
-            try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+            try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
                int count = 0;
                while (iterator.hasNext()) {
                   MessageReference ref = iterator.next();
@@ -895,7 +895,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
          ArrayList<CompositeData> c = new ArrayList<>();
          Filter filter = FilterImpl.createFilter(filterStr);
          queue.flushExecutor();
-         try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+         try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
             while (iterator.hasNext() && currentPageSize++ < pageSize) {
                MessageReference ref = iterator.next();
                if (filter == null || filter.match(ref.getMessage())) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 89c6d44..6e569c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -56,7 +56,10 @@ public interface PageSubscription {
 
    LinkedListIterator<PagedReference> iterator();
 
-   // To be called when the cursor is closed for good. Most likely when the queue is deleted
+   LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
+
+
+      // To be called when the cursor is closed for good. Most likely when the queue is deleted
    void destroy() throws Exception;
 
    void scheduleCleanupCheck();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index e01098d..01ad778 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -251,7 +251,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
 
             recordID = -1;
             value.set(0);
-            added.set(0);
             incrementRecords.clear();
          }
       } finally {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c1c54a2..063722c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -351,6 +351,11 @@ final class PageSubscriptionImpl implements PageSubscription {
       return new CursorIterator();
    }
 
+   @Override
+   public PageIterator iterator(boolean browsing) {
+      return new CursorIterator(browsing);
+   }
+
    private PagedReference internalGetNext(final PagePosition pos) {
       PagePosition retPos = pos.nextMessage();
 
@@ -1100,6 +1105,8 @@ final class PageSubscriptionImpl implements PageSubscription {
 
       private volatile PagedReference lastRedelivery = null;
 
+      private final boolean browsing;
+
       // We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
       private final java.util.Queue<PagePosition> redeliveries = new LinkedList<>();
 
@@ -1109,7 +1116,13 @@ final class PageSubscriptionImpl implements PageSubscription {
        */
       private volatile PagedReference cachedNext;
 
+      private CursorIterator(boolean browsing) {
+         this.browsing = browsing;
+      }
+
+
       private CursorIterator() {
+         this.browsing = false;
       }
 
       @Override
@@ -1199,7 +1212,7 @@ final class PageSubscriptionImpl implements PageSubscription {
 
                PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
 
-               if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
+               if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
                   continue;
                }
 
@@ -1225,7 +1238,7 @@ final class PageSubscriptionImpl implements PageSubscription {
                   // nothing
                   // is being changed. That's why the false is passed as a parameter here
 
-                  if (info != null && info.isRemoved(message.getPosition())) {
+                  if (!browsing && info != null && info.isRemoved(message.getPosition())) {
                      valid = false;
                   }
                }
@@ -1237,10 +1250,10 @@ final class PageSubscriptionImpl implements PageSubscription {
                if (valid) {
                   match = match(message.getMessage());
 
-                  if (!match) {
+                  if (!browsing && !match) {
                      processACK(message.getPosition());
                   }
-               } else if (ignored) {
+               } else if (!browsing && ignored) {
                   positionIgnored(message.getPosition());
                }
             } while (!match);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0dcef3d..52cd2f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -195,7 +195,7 @@ public interface Queue extends Bindable {
     */
    LinkedListIterator<MessageReference> iterator();
 
-   LinkedListIterator<MessageReference> totalIterator();
+   LinkedListIterator<MessageReference> browserIterator();
 
    SimpleString getExpiryAddress();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b70fe8d..56a33ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -867,8 +867,8 @@ public class QueueImpl implements Queue {
    }
 
    @Override
-   public TotalQueueIterator totalIterator() {
-      return new TotalQueueIterator();
+   public QueueBrowserIterator browserIterator() {
+      return new QueueBrowserIterator();
    }
 
    @Override
@@ -2863,17 +2863,23 @@ public class QueueImpl implements Queue {
 
    //Readonly (no remove) iterator over the messages in the queue, in order of
    //paging store, intermediateMessageReferences and MessageReferences
-   private class TotalQueueIterator implements LinkedListIterator<MessageReference> {
+   private class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
 
-      LinkedListIterator<PagedReference> pageIter = null;
+      LinkedListIterator<PagedReference> pagingIterator = null;
       LinkedListIterator<MessageReference> messagesIterator = null;
 
+      private LinkedListIterator<PagedReference> getPagingIterator() {
+         if (pagingIterator == null) {
+            pagingIterator = pageSubscription.iterator(true);
+         }
+         return pagingIterator;
+      }
+
       Iterator lastIterator = null;
 
-      private TotalQueueIterator() {
-         if (pageSubscription != null) {
-            pageIter = pageSubscription.iterator();
-         }
+      MessageReference cachedNext = null;
+
+      private QueueBrowserIterator() {
          messagesIterator = new SynchronizedIterator(messageReferences.iterator());
       }
 
@@ -2883,9 +2889,9 @@ public class QueueImpl implements Queue {
             lastIterator = messagesIterator;
             return true;
          }
-         if (pageIter != null) {
-            if (pageIter.hasNext()) {
-               lastIterator = pageIter;
+         if (getPagingIterator() != null) {
+            if (getPagingIterator().hasNext()) {
+               lastIterator = getPagingIterator();
                return true;
             }
          }
@@ -2893,16 +2899,37 @@ public class QueueImpl implements Queue {
          return false;
       }
 
+
+
       @Override
       public MessageReference next() {
-         if (messagesIterator != null && messagesIterator.hasNext()) {
-            MessageReference msg = messagesIterator.next();
-            return msg;
+
+         if (cachedNext != null) {
+            try {
+               return cachedNext;
+            } finally {
+               cachedNext = null;
+            }
+
          }
-         if (pageIter != null) {
-            if (pageIter.hasNext()) {
-               lastIterator = pageIter;
-               return pageIter.next();
+         while (true) {
+            if (messagesIterator != null && messagesIterator.hasNext()) {
+               MessageReference msg = messagesIterator.next();
+               if (msg.isPaged()) {
+                  System.out.println("** Rejecting because it's paged " + msg.getMessage());
+                  continue;
+               }
+//               System.out.println("** Returning because it's not paged " + msg.getMessage());
+               return msg;
+            } else {
+               break;
+            }
+         }
+         if (getPagingIterator() != null) {
+            if (getPagingIterator().hasNext()) {
+               lastIterator = getPagingIterator();
+               MessageReference ref = getPagingIterator().next();
+               return ref;
             }
          }
 
@@ -2922,8 +2949,8 @@ public class QueueImpl implements Queue {
 
       @Override
       public void close() {
-         if (pageIter != null) {
-            pageIter.close();
+         if (getPagingIterator() != null) {
+            getPagingIterator().close();
          }
          if (messagesIterator != null) {
             messagesIterator.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index b763ff2..dc62676 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -165,7 +165,7 @@ public class ScaleDownHandler {
          for (Queue loopQueue : queues) {
             logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
 
-            try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator()) {
+            try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
 
                while (messagesIterator.hasNext()) {
                   MessageReference messageReference = messagesIterator.next();
@@ -249,7 +249,7 @@ public class ScaleDownHandler {
 
       for (Queue queue : queues) {
          // using auto-closeable
-         try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator()) {
+         try (LinkedListIterator<MessageReference> messagesIterator = queue.browserIterator()) {
             // loop through every message of this queue
             while (messagesIterator.hasNext()) {
                MessageReference messageRef = messagesIterator.next();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 1318ff3..98a9c84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -206,7 +206,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       this.creationTime = System.currentTimeMillis();
 
       if (browseOnly) {
-         browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
+         browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
       } else {
          messageQueue.addConsumer(this);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index d82f7d3..55a287a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1212,7 +1212,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public LinkedListIterator<MessageReference> totalIterator() {
+      public LinkedListIterator<MessageReference> browserIterator() {
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index ca8a9a1..1f0d7e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -308,7 +308,7 @@ public class PagingSendTest extends ActiveMQTestBase {
     * duplicates that may have happened before this point).
     */
    public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception {
-      LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+      LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
 
       Set<String> messageOrderSet = new HashSet<>();
 
@@ -344,7 +344,7 @@ public class PagingSendTest extends ActiveMQTestBase {
     * duplicates that may have happened before this point).
     */
    protected int processCountThroughIterator(Queue queue) throws Exception {
-      LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+      LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
 
       int count = 0;
       while (pageIterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bba5dc1..9a20d70 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -604,7 +604,7 @@ public class FakeQueue implements Queue {
    }
 
    @Override
-   public LinkedListIterator<MessageReference> totalIterator() {
+   public LinkedListIterator<MessageReference> browserIterator() {
       // TODO Auto-generated method stub
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index b9bdba7..804429f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1274,7 +1274,7 @@ public class QueueImplTest extends ActiveMQTestBase {
       locator.close();
 
       Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue();
-      LinkedListIterator<MessageReference> totalIterator = queue.totalIterator();
+      LinkedListIterator<MessageReference> totalIterator = queue.browserIterator();
 
       try {
          int i = 0;