You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/04/12 17:35:22 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6239

Repository: activemq
Updated Branches:
  refs/heads/master 485fcafcd -> c1b58d337


https://issues.apache.org/jira/browse/AMQ-6239

Refactor the iterator implementation in the PrioritizedPendingList to
not copy elements and instead use the level iterators.  Add some
additional tests.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c1b58d33
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c1b58d33
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c1b58d33

Branch: refs/heads/master
Commit: c1b58d3373746eda525e6c3b3ab04eb747e9674b
Parents: 485fcaf
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 12 11:35:03 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 12 11:35:03 2016 -0400

----------------------------------------------------------------------
 .../region/cursors/PrioritizedPendingList.java  | 63 +++++++++++++-------
 .../cursors/PrioritizedPendingListTest.java     | 30 +++++++++-
 2 files changed, 69 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c1b58d33/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index deabd50..8a9bb17 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -16,20 +16,19 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
-import java.util.ArrayList;
+import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
+
+import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.management.SizeStatisticImpl;
 
-
-import static org.apache.activemq.broker.region.cursors.OrderedPendingList.getValues;
-
 public class PrioritizedPendingList implements PendingList {
 
     private static final Integer MAX_PRIORITY = 10;
@@ -121,38 +120,58 @@ public class PrioritizedPendingList implements PendingList {
         return lists[getPriority(msg)];
     }
 
-    private class PrioritizedPendingListIterator implements Iterator<MessageReference> {
-        private int index = 0;
-        private int currentIndex = 0;
-        List<PendingNode> list = new ArrayList<PendingNode>(size());
+    private final class PrioritizedPendingListIterator implements Iterator<MessageReference> {
+
+        private final Deque<Iterator<MessageReference>> iterators = new ArrayDeque<Iterator<MessageReference>>();
+
+        private Iterator<MessageReference> current;
+        private MessageReference currentMessage;
 
         PrioritizedPendingListIterator() {
-            for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
-                OrderedPendingList orderedPendingList = lists[i];
-                if (!orderedPendingList.isEmpty()) {
-                    list.addAll(orderedPendingList.getAsList());
+            for (OrderedPendingList list : lists) {
+                if (!list.isEmpty()) {
+                    iterators.push(list.iterator());
                 }
             }
+
+            current = iterators.poll();
         }
+
         @Override
         public boolean hasNext() {
-            return list.size() > index;
+            while (current != null) {
+                if (current.hasNext()) {
+                    return true;
+                } else {
+                    current = iterators.poll();
+                }
+            }
+
+            return false;
         }
 
         @Override
         public MessageReference next() {
-            PendingNode node = list.get(this.index);
-            this.currentIndex = this.index;
-            this.index++;
-            return node.getMessage();
+            MessageReference result = null;
+
+            while (current != null) {
+                if (current.hasNext()) {
+                    result = currentMessage = current.next();
+                    break;
+                } else {
+                    current = iterators.poll();
+                }
+            }
+
+            return result;
         }
 
         @Override
         public void remove() {
-            PendingNode node = list.get(this.currentIndex);
-            if (node != null) {
-                pendingMessageHelper.removeFromMap(node.getMessage());
-                node.getList().removeNode(node);
+            if (currentMessage != null) {
+                pendingMessageHelper.removeFromMap(currentMessage);
+                current.remove();
+                currentMessage = null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/c1b58d33/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
index 80d090d..8ecc951 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingListTest.java
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.broker.region.cursors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Iterator;
@@ -187,7 +187,9 @@ public class PrioritizedPendingListTest {
         Iterator<MessageReference> iter = list.iterator();
         int lastId = list.size();
         while (iter.hasNext()) {
-            assertEquals(lastId--, iter.next().getMessage().getPriority());
+            MessageReference nextMessage = iter.next();
+            assertNotNull(nextMessage);
+            assertEquals(lastId--, nextMessage.getMessage().getPriority());
         }
     }
 
@@ -215,6 +217,30 @@ public class PrioritizedPendingListTest {
         }
     }
 
+    @Test
+    public void testFullRangeIteration() {
+        PrioritizedPendingList list = new PrioritizedPendingList();
+
+        int totalElements = 0;
+
+        for (int i = 0; i < 10; ++i) {
+            list.addMessageFirst(new TestMessageReference(totalElements++, i));
+            list.addMessageFirst(new TestMessageReference(totalElements++, i));
+        }
+
+        assertTrue(list.size() == totalElements);
+
+        int totalIterated = 0;
+        Iterator<MessageReference> iter = list.iterator();
+        while (iter.hasNext()) {
+            MessageReference nextMessage = iter.next();
+            assertNotNull(nextMessage);
+            totalIterated++;
+        }
+
+        assertEquals(totalElements, totalIterated);
+    }
+
     static class TestMessageReference implements MessageReference {
 
         private static final IdGenerator id = new IdGenerator();