You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2020/09/10 00:30:14 UTC

[kafka] branch trunk updated: MINOR: add ImplicitLinkedHashCollection#moveToEnd (#9269)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 86013dc  MINOR: add ImplicitLinkedHashCollection#moveToEnd (#9269)
86013dc is described below

commit 86013dc9f8cf02813418f41c3c49c745f9ee6ea5
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Wed Sep 9 17:29:12 2020 -0700

    MINOR: add ImplicitLinkedHashCollection#moveToEnd (#9269)
    
    Add ImplicitLinkedHashCollection#moveToEnd.
    
    Refactor ImplicitLinkedHashCollectionIterator to be a little bit more
    robust against concurrent modifications to the map (which admittedly
    should not happen.)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../common/utils/ImplicitLinkedHashCollection.java | 72 ++++++++++++++--------
 .../utils/ImplicitLinkedHashCollectionTest.java    | 20 +++++-
 2 files changed, 64 insertions(+), 28 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index 629fdc9..decabe8 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -143,76 +143,82 @@ public class ImplicitLinkedHashCollection<E extends ImplicitLinkedHashCollection
     }
 
     private class ImplicitLinkedHashCollectionIterator implements ListIterator<E> {
-        private int cursor = 0;
-        private Element cur = head;
-        private int lastReturnedSlot = INVALID_INDEX;
+        private int index = 0;
+        private Element cur;
+        private Element lastReturned;
 
         ImplicitLinkedHashCollectionIterator(int index) {
+            this.cur = indexToElement(head, elements, head.next());
             for (int i = 0; i < index; ++i) {
-                cur = indexToElement(head, elements, cur.next());
-                cursor++;
+                next();
             }
+            this.lastReturned = null;
         }
 
         @Override
         public boolean hasNext() {
-            return cursor != size;
+            return cur != head;
         }
 
         @Override
         public boolean hasPrevious() {
-            return cursor != 0;
+            return indexToElement(head, elements, cur.prev()) != head;
         }
 
         @Override
         public E next() {
-            if (cursor == size) {
+            if (!hasNext()) {
                 throw new NoSuchElementException();
             }
-            lastReturnedSlot = cur.next();
-            cur = indexToElement(head, elements, cur.next());
-            ++cursor;
             @SuppressWarnings("unchecked")
             E returnValue = (E) cur;
+            lastReturned = cur;
+            cur = indexToElement(head, elements, cur.next());
+            ++index;
             return returnValue;
         }
 
         @Override
         public E previous() {
-            if (cursor == 0) {
+            Element prev = indexToElement(head, elements, cur.prev());
+            if (prev == head) {
                 throw new NoSuchElementException();
             }
+            cur = prev;
+            --index;
+            lastReturned = cur;
             @SuppressWarnings("unchecked")
             E returnValue = (E) cur;
-            cur = indexToElement(head, elements, cur.prev());
-            lastReturnedSlot = cur.next();
-            --cursor;
             return returnValue;
         }
 
         @Override
         public int nextIndex() {
-            return cursor;
+            return index;
         }
 
         @Override
         public int previousIndex() {
-            return cursor - 1;
+            return index - 1;
         }
 
         @Override
         public void remove() {
-            if (lastReturnedSlot == INVALID_INDEX) {
+            if (lastReturned == null) {
                 throw new IllegalStateException();
             }
-
-            if (cur == indexToElement(head, elements, lastReturnedSlot)) {
-                cursor--;
-                cur = indexToElement(head, elements, cur.prev());
+            Element nextElement = indexToElement(head, elements, lastReturned.next());
+            removeFromList(head, elements, nextElement.prev());
+            size--;
+            if (lastReturned == cur) {
+                // If the element we are removing was cur, set cur to cur->next.
+                cur = nextElement;
+            } else {
+                // If the element we are removing comes before cur, decrement the index,
+                // since there are now fewer entries before cur.
+                --index;
             }
-            ImplicitLinkedHashCollection.this.removeElementAtSlot(lastReturnedSlot);
-
-            lastReturnedSlot = INVALID_INDEX;
+            lastReturned = null;
         }
 
         @Override
@@ -562,6 +568,22 @@ public class ImplicitLinkedHashCollection<E extends ImplicitLinkedHashCollection
     }
 
     /**
+     * Moves an element which is already in the collection so that it comes last
+     * in iteration order.
+     */
+    final public void moveToEnd(E element) {
+        if (element.prev() == INVALID_INDEX || element.next() == INVALID_INDEX) {
+            throw new RuntimeException("Element " + element + " is not in the collection.");
+        }
+        Element prevElement = indexToElement(head, elements, element.prev());
+        Element nextElement = indexToElement(head, elements, element.next());
+        int slot = prevElement.next();
+        prevElement.setNext(element.next());
+        nextElement.setPrev(element.prev());
+        addToListTail(head, elements, slot);
+    }
+
+    /**
      * Removes all of the elements from this set, and resets the set capacity
      * based on the provided expected number of elements.
      */
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index d23018d..01be3a6 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -502,7 +502,7 @@ public class ImplicitLinkedHashCollectionTest {
             addRandomElement(random, existing, coll);
             addRandomElement(random, existing, coll);
             addRandomElement(random, existing, coll);
-            removeRandomElement(random, existing, coll);
+            removeRandomElement(random, existing);
             expectTraversal(coll.iterator(), existing.iterator());
         }
     }
@@ -561,8 +561,7 @@ public class ImplicitLinkedHashCollectionTest {
     }
 
     @SuppressWarnings("unlikely-arg-type")
-    private void removeRandomElement(Random random, Collection<Integer> existing,
-                                     ImplicitLinkedHashCollection<TestElement> coll) {
+    private void removeRandomElement(Random random, Collection<Integer> existing) {
         int removeIdx = random.nextInt(existing.size());
         Iterator<Integer> iter = existing.iterator();
         Integer element = null;
@@ -582,4 +581,19 @@ public class ImplicitLinkedHashCollectionTest {
         assertFalse(element2.equals(element1));
         assertTrue(element2.elementKeysAreEqual(element1));
     }
+
+    @Test
+    public void testMoveToEnd() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        TestElement e1 = new TestElement(1, 1);
+        TestElement e2 = new TestElement(2, 2);
+        TestElement e3 = new TestElement(3, 3);
+        assertTrue(coll.add(e1));
+        assertTrue(coll.add(e2));
+        assertTrue(coll.add(e3));
+        coll.moveToEnd(e1);
+        expectTraversal(coll.iterator(), 2, 3, 1);
+        Assert.assertThrows(RuntimeException.class, () ->
+            coll.moveToEnd(new TestElement(4, 4)));
+    }
 }