You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/04/12 17:35:22 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6239
Repository: activemq
Updated Branches:
refs/heads/master 485fcafcd -> c1b58d337
https://issues.apache.org/jira/browse/AMQ-6239
Refactor the iterator implementation in the PrioritizedPendingList to
not copy elements and instead use the level iterators. Add some
additional tests.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c1b58d33
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c1b58d33
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c1b58d33
Branch: refs/heads/master
Commit: c1b58d3373746eda525e6c3b3ab04eb747e9674b
Parents: 485fcaf
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 12 11:35:03 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 12 11:35:03 2016 -0400
----------------------------------------------------------------------
.../region/cursors/PrioritizedPendingList.java | 63 +++++++++++++-------
.../cursors/PrioritizedPendingListTest.java | 30 +++++++++-
2 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c1b58d33/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index deabd50..8a9bb17 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -16,20 +16,19 @@
*/
package org.apache.activemq.broker.region.cursors;
-import java.util.ArrayList;
+import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
+
+import java.util.ArrayDeque;
import java.util.Collection;
+import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.management.SizeStatisticImpl;
-
-import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
-
public class PrioritizedPendingList implements PendingList {
private static final Integer MAX_PRIORITY = 10;
@@ -121,38 +120,58 @@ public class PrioritizedPendingList implements PendingList {
return lists[getPriority(msg)];
}
- private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
- private int index = 0;
- private int currentIndex = 0;
- List<PendingNode> list = new ArrayList<PendingNode>(size());
+ private final class PrioritizedPendingListIterator implements Iterator<MessageReference> {
+
+ private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>();
+
+ private Iterator<MessageReference> current;
+ private MessageReference currentMessage;
PrioritizedPendingListIterator() {
- for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
- OrderedPendingList orderedPendingList = lists[i];
- if (!orderedPendingList.isEmpty()) {
- list.addAll(orderedPendingList.getAsList());
+ for (OrderedPendingList list : lists) {
+ if (!list.isEmpty()) {
+ iterators.push(list.iterator());
}
}
+
+ current = iterators.poll();
}
+
@Override
public boolean hasNext() {
- return list.size() > index;
+ while (current != null) {
+ if (current.hasNext()) {
+ return true;
+ } else {
+ current = iterators.poll();
+ }
+ }
+
+ return false;
}
@Override
public MessageReference next() {
- PendingNode node = list.get(this.index);
- this.currentIndex = this.index;
- this.index++;
- return node.getMessage();
+ MessageReference result = null;
+
+ while (current != null) {
+ if (current.hasNext()) {
+ result = currentMessage = current.next();
+ break;
+ } else {
+ current = iterators.poll();
+ }
+ }
+
+ return result;
}
@Override
public void remove() {
- PendingNode node = list.get(this.currentIndex);
- if (node != null) {
- pendingMessageHelper.removeFromMap(node.getMessage());
- node.getList().removeNode(node);
+ if (currentMessage != null) {
+ pendingMessageHelper.removeFromMap(currentMessage);
+ current.remove();
+ currentMessage = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c1b58d33/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
index 80d090d..8ecc951 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
@@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.broker.region.cursors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Iterator;
@@ -187,7 +187,9 @@ public class PrioritizedPendingListTest {
Iterator<MessageReference> iter = list.iterator();
int lastId = list.size();
while (iter.hasNext()) {
- assertEquals(lastId--, iter.next().getMessage().getPriority());
+ MessageReference nextMessage = iter.next();
+ assertNotNull(nextMessage);
+ assertEquals(lastId--, nextMessage.getMessage().getPriority());
}
}
@@ -215,6 +217,30 @@ public class PrioritizedPendingListTest {
}
}
+ @Test
+ public void testFullRangeIteration() {
+ PrioritizedPendingList list = new PrioritizedPendingList();
+
+ int totalElements = 0;
+
+ for (int i = 0; i < 10; ++i) {
+ list.addMessageFirst(new TestMessageReference(totalElements++, i));
+ list.addMessageFirst(new TestMessageReference(totalElements++, i));
+ }
+
+ assertTrue(list.size() == totalElements);
+
+ int totalIterated = 0;
+ Iterator<MessageReference> iter = list.iterator();
+ while (iter.hasNext()) {
+ MessageReference nextMessage = iter.next();
+ assertNotNull(nextMessage);
+ totalIterated++;
+ }
+
+ assertEquals(totalElements, totalIterated);
+ }
+
static class TestMessageReference implements MessageReference {
private static final IdGenerator id = new IdGenerator();