You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:44 UTC
[15/34] activemq-artemis git commit: ARTEMIS-828 Queue browsing can
be out of sync while paging
ARTEMIS-828 Queue browsing can be out of sync while paging
https://issues.apache.org/jira/browse/ARTEMIS-828
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bfb9bedb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bfb9bedb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bfb9bedb
Branch: refs/heads/ARTEMIS-780
Commit: bfb9bedb2d7a3d8ab5e336509915eb6ecafaefb3
Parents: e002125
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Oct 27 17:30:34 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Oct 28 16:54:58 2016 -0400
----------------------------------------------------------------------
.../core/management/impl/QueueControlImpl.java | 8 +--
.../core/paging/cursor/PageSubscription.java | 5 +-
.../impl/PageSubscriptionCounterImpl.java | 1 -
.../cursor/impl/PageSubscriptionImpl.java | 21 ++++--
.../activemq/artemis/core/server/Queue.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 67 ++++++++++++++------
.../core/server/impl/ScaleDownHandler.java | 4 +-
.../core/server/impl/ServerConsumerImpl.java | 2 +-
.../impl/ScheduledDeliveryHandlerTest.java | 2 +-
.../integration/paging/PagingSendTest.java | 4 +-
.../unit/core/postoffice/impl/FakeQueue.java | 2 +-
.../unit/core/server/impl/QueueImplTest.java | 2 +-
12 files changed, 81 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index cfa8aa5..85bad25 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -410,7 +410,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
- try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
@@ -446,7 +446,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
try {
List<Map<String, Object>> messages = new ArrayList<>();
queue.flushExecutor();
- try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
// returns just the first, as it's the first only
if (iterator.hasNext()) {
MessageReference ref = iterator.next();
@@ -499,7 +499,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
if (filter == null) {
return getMessageCount();
} else {
- try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
int count = 0;
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
@@ -895,7 +895,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
ArrayList<CompositeData> c = new ArrayList<>();
Filter filter = FilterImpl.createFilter(filterStr);
queue.flushExecutor();
- try (LinkedListIterator<MessageReference> iterator = queue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> iterator = queue.browserIterator()) {
while (iterator.hasNext() && currentPageSize++ < pageSize) {
MessageReference ref = iterator.next();
if (filter == null || filter.match(ref.getMessage())) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index 89c6d44..6e569c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -56,7 +56,10 @@ public interface PageSubscription {
LinkedListIterator<PagedReference> iterator();
- // To be called when the cursor is closed for good. Most likely when the queue is deleted
+ LinkedListIterator<PagedReference> iterator(boolean jumpRemoves);
+
+
+ // To be called when the cursor is closed for good. Most likely when the queue is deleted
void destroy() throws Exception;
void scheduleCleanupCheck();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
index e01098d..01ad778 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
@@ -251,7 +251,6 @@ public class PageSubscriptionCounterImpl implements PageSubscriptionCounter {
recordID = -1;
value.set(0);
- added.set(0);
incrementRecords.clear();
}
} finally {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index c1c54a2..063722c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -351,6 +351,11 @@ final class PageSubscriptionImpl implements PageSubscription {
return new CursorIterator();
}
+ @Override
+ public PageIterator iterator(boolean browsing) {
+ return new CursorIterator(browsing);
+ }
+
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();
@@ -1100,6 +1105,8 @@ final class PageSubscriptionImpl implements PageSubscription {
private volatile PagedReference lastRedelivery = null;
+ private final boolean browsing;
+
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final java.util.Queue<PagePosition> redeliveries = new LinkedList<>();
@@ -1109,7 +1116,13 @@ final class PageSubscriptionImpl implements PageSubscription {
*/
private volatile PagedReference cachedNext;
+ private CursorIterator(boolean browsing) {
+ this.browsing = browsing;
+ }
+
+
private CursorIterator() {
+ this.browsing = false;
}
@Override
@@ -1199,7 +1212,7 @@ final class PageSubscriptionImpl implements PageSubscription {
PageCursorInfo info = getPageInfo(message.getPosition().getPageNr());
- if (info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
+ if (!browsing && info != null && (info.isRemoved(message.getPosition()) || info.getCompleteInfo() != null)) {
continue;
}
@@ -1225,7 +1238,7 @@ final class PageSubscriptionImpl implements PageSubscription {
// nothing
// is being changed. That's why the false is passed as a parameter here
- if (info != null && info.isRemoved(message.getPosition())) {
+ if (!browsing && info != null && info.isRemoved(message.getPosition())) {
valid = false;
}
}
@@ -1237,10 +1250,10 @@ final class PageSubscriptionImpl implements PageSubscription {
if (valid) {
match = match(message.getMessage());
- if (!match) {
+ if (!browsing && !match) {
processACK(message.getPosition());
}
- } else if (ignored) {
+ } else if (!browsing && ignored) {
positionIgnored(message.getPosition());
}
} while (!match);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0dcef3d..52cd2f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -195,7 +195,7 @@ public interface Queue extends Bindable {
*/
LinkedListIterator<MessageReference> iterator();
- LinkedListIterator<MessageReference> totalIterator();
+ LinkedListIterator<MessageReference> browserIterator();
SimpleString getExpiryAddress();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index b70fe8d..56a33ef 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -867,8 +867,8 @@ public class QueueImpl implements Queue {
}
@Override
- public TotalQueueIterator totalIterator() {
- return new TotalQueueIterator();
+ public QueueBrowserIterator browserIterator() {
+ return new QueueBrowserIterator();
}
@Override
@@ -2863,17 +2863,23 @@ public class QueueImpl implements Queue {
//Readonly (no remove) iterator over the messages in the queue, in order of
//paging store, intermediateMessageReferences and MessageReferences
- private class TotalQueueIterator implements LinkedListIterator<MessageReference> {
+ private class QueueBrowserIterator implements LinkedListIterator<MessageReference> {
- LinkedListIterator<PagedReference> pageIter = null;
+ LinkedListIterator<PagedReference> pagingIterator = null;
LinkedListIterator<MessageReference> messagesIterator = null;
+ private LinkedListIterator<PagedReference> getPagingIterator() {
+ if (pagingIterator == null) {
+ pagingIterator = pageSubscription.iterator(true);
+ }
+ return pagingIterator;
+ }
+
Iterator lastIterator = null;
- private TotalQueueIterator() {
- if (pageSubscription != null) {
- pageIter = pageSubscription.iterator();
- }
+ MessageReference cachedNext = null;
+
+ private QueueBrowserIterator() {
messagesIterator = new SynchronizedIterator(messageReferences.iterator());
}
@@ -2883,9 +2889,9 @@ public class QueueImpl implements Queue {
lastIterator = messagesIterator;
return true;
}
- if (pageIter != null) {
- if (pageIter.hasNext()) {
- lastIterator = pageIter;
+ if (getPagingIterator() != null) {
+ if (getPagingIterator().hasNext()) {
+ lastIterator = getPagingIterator();
return true;
}
}
@@ -2893,16 +2899,37 @@ public class QueueImpl implements Queue {
return false;
}
+
+
@Override
public MessageReference next() {
- if (messagesIterator != null && messagesIterator.hasNext()) {
- MessageReference msg = messagesIterator.next();
- return msg;
+
+ if (cachedNext != null) {
+ try {
+ return cachedNext;
+ } finally {
+ cachedNext = null;
+ }
+
}
- if (pageIter != null) {
- if (pageIter.hasNext()) {
- lastIterator = pageIter;
- return pageIter.next();
+ while (true) {
+ if (messagesIterator != null && messagesIterator.hasNext()) {
+ MessageReference msg = messagesIterator.next();
+ if (msg.isPaged()) {
+ System.out.println("** Rejecting because it's paged " + msg.getMessage());
+ continue;
+ }
+// System.out.println("** Returning because it's not paged " + msg.getMessage());
+ return msg;
+ } else {
+ break;
+ }
+ }
+ if (getPagingIterator() != null) {
+ if (getPagingIterator().hasNext()) {
+ lastIterator = getPagingIterator();
+ MessageReference ref = getPagingIterator().next();
+ return ref;
}
}
@@ -2922,8 +2949,8 @@ public class QueueImpl implements Queue {
@Override
public void close() {
- if (pageIter != null) {
- pageIter.close();
+ if (getPagingIterator() != null) {
+ getPagingIterator().close();
}
if (messagesIterator != null) {
messagesIterator.close();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
index b763ff2..dc62676 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScaleDownHandler.java
@@ -165,7 +165,7 @@ public class ScaleDownHandler {
for (Queue loopQueue : queues) {
logger.debug("Scaling down messages on address " + address + " / performing loop on queue " + loopQueue);
- try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> messagesIterator = loopQueue.browserIterator()) {
while (messagesIterator.hasNext()) {
MessageReference messageReference = messagesIterator.next();
@@ -249,7 +249,7 @@ public class ScaleDownHandler {
for (Queue queue : queues) {
// using auto-closeable
- try (LinkedListIterator<MessageReference> messagesIterator = queue.totalIterator()) {
+ try (LinkedListIterator<MessageReference> messagesIterator = queue.browserIterator()) {
// loop through every message of this queue
while (messagesIterator.hasNext()) {
MessageReference messageRef = messagesIterator.next();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 1318ff3..98a9c84 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -206,7 +206,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
- browserDeliverer = new BrowserDeliverer(messageQueue.totalIterator());
+ browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
} else {
messageQueue.addConsumer(this);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index d82f7d3..55a287a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1212,7 +1212,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public LinkedListIterator<MessageReference> totalIterator() {
+ public LinkedListIterator<MessageReference> browserIterator() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
index ca8a9a1..1f0d7e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingSendTest.java
@@ -308,7 +308,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
public void checkBatchMessagesAreNotPagedTwice(Queue queue) throws Exception {
- LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+ LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
Set<String> messageOrderSet = new HashSet<>();
@@ -344,7 +344,7 @@ public class PagingSendTest extends ActiveMQTestBase {
* duplicates that may have happened before this point).
*/
protected int processCountThroughIterator(Queue queue) throws Exception {
- LinkedListIterator<MessageReference> pageIterator = queue.totalIterator();
+ LinkedListIterator<MessageReference> pageIterator = queue.browserIterator();
int count = 0;
while (pageIterator.hasNext()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bba5dc1..9a20d70 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -604,7 +604,7 @@ public class FakeQueue implements Queue {
}
@Override
- public LinkedListIterator<MessageReference> totalIterator() {
+ public LinkedListIterator<MessageReference> browserIterator() {
// TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bfb9bedb/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index b9bdba7..804429f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1274,7 +1274,7 @@ public class QueueImplTest extends ActiveMQTestBase {
locator.close();
Queue queue = ((LocalQueueBinding) server.getPostOffice().getBinding(new SimpleString(MY_QUEUE))).getQueue();
- LinkedListIterator<MessageReference> totalIterator = queue.totalIterator();
+ LinkedListIterator<MessageReference> totalIterator = queue.browserIterator();
try {
int i = 0;