You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 23:23:30 UTC

[kafka] branch 2.4 updated: KAFKA-9419: Fix possible integer overflow in CircularIterator (#7950)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 26e216e  KAFKA-9419: Fix possible integer overflow in CircularIterator (#7950)
26e216e is described below

commit 26e216e0cb361ed8584d2c8aec010b0f10eb2152
Author: belugabehr <12...@users.noreply.github.com>
AuthorDate: Wed May 6 19:02:26 2020 -0400

    KAFKA-9419: Fix possible integer overflow in CircularIterator (#7950)
    
    The CircularIterator class uses a wrapping index-based approach to iterate over a list. This can be a performance problem O(n^2) for a LinkedList. Also, the index counter itself is never reset, a modulo is applied to it for every list access. At some point, it may be possible that the index counter overflows to a negative value and therefore may cause a negative index read and an ArrayIndexOutOfBoundsException.
    
    This fix changes the implementation to avoid these two scenarios. Uses the Collection Iterator classes to avoid using an index counter and it avoids having to seek to the correct index every time, this avoiding the LinkedList performance issue.
    
    I have added unit tests to validate the new implementation.
    
    * KAFKA-9419: Integer Overflow Possible with CircularIterator
    * Added JavaDoc. Support null values in the underlying collection
    * Always return true for hasNext(). Add more JavaDoc
    * Use an advance method to load next value and always return true in hasNext()
    * Simplify test suite
    * Use assertThrows in tests and remove redundant 'this' identifier
    
    Co-authored-by: David Mollitor <dm...@apache.org>
    Co-authored-by: Konstantine Karantasis <ko...@confluent.io>
    
    Reviewers: Ron Dagostino <rd...@confluent.io>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/common/utils/CircularIterator.java       | 71 +++++++++++++++++++---
 .../kafka/common/utils/CircularIteratorTest.java   | 66 ++++++++++++++++++++
 2 files changed, 128 insertions(+), 9 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
index 3e7d5eb..925f4ad 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/CircularIterator.java
@@ -14,22 +14,54 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.kafka.common.utils;
 
+import java.util.Collection;
+import java.util.ConcurrentModificationException;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Objects;
 
+/**
+ * An iterator that cycles through the {@code Iterator} of a {@code Collection}
+ * indefinitely. Useful for tasks such as round-robin load balancing. This class
+ * does not provide thread-safe access. This {@code Iterator} supports
+ * {@code null} elements in the underlying {@code Collection}. This
+ * {@code Iterator} does not support any modification to the underlying
+ * {@code Collection} after it has been wrapped by this class. Changing the
+ * underlying {@code Collection} may cause a
+ * {@link ConcurrentModificationException} or some other undefined behavior.
+ */
 public class CircularIterator<T> implements Iterator<T> {
-    int i = 0;
-    private List<T> list;
 
-    public CircularIterator(List<T> list) {
-        if (list.isEmpty()) {
+    private final Iterable<T> iterable;
+    private Iterator<T> iterator;
+    private T nextValue;
+
+    /**
+     * Create a new instance of a CircularIterator. The ordering of this
+     * Iterator will be dictated by the Iterator returned by Collection itself.
+     *
+     * @param col The collection to iterate indefinitely
+     *
+     * @throws NullPointerException if col is {@code null}
+     * @throws IllegalArgumentException if col is empty.
+     */
+    public CircularIterator(final Collection<T> col) {
+        this.iterable = Objects.requireNonNull(col);
+        this.iterator = col.iterator();
+        if (col.isEmpty()) {
             throw new IllegalArgumentException("CircularIterator can only be used on non-empty lists");
         }
-        this.list = list;
+        this.nextValue = advance();
     }
 
+    /**
+     * Returns true since the iteration will forever cycle through the provided
+     * {@code Collection}.
+     *
+     * @return Always true
+     */
     @Override
     public boolean hasNext() {
         return true;
@@ -37,13 +69,34 @@ public class CircularIterator<T> implements Iterator<T> {
 
     @Override
     public T next() {
-        T next = list.get(i);
-        i = (i + 1) % list.size();
+        final T next = nextValue;
+        nextValue = advance();
         return next;
     }
 
+    /**
+     * Return the next value in the {@code Iterator}, restarting the
+     * {@code Iterator} if necessary.
+     *
+     * @return The next value in the iterator
+     */
+    private T advance() {
+        if (!iterator.hasNext()) {
+            iterator = iterable.iterator();
+        }
+        return iterator.next();
+    }
+
+    /**
+     * Peek at the next value in the Iterator. Calling this method multiple
+     * times will return the same element without advancing this Iterator. The
+     * value returned by this method will be the next item returned by
+     * {@code next()}.
+     *
+     * @return The next value in this {@code Iterator}
+     */
     public T peek() {
-        return list.get(i);
+        return nextValue;
     }
 
     @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/CircularIteratorTest.java b/clients/src/test/java/org/apache/kafka/common/utils/CircularIteratorTest.java
new file mode 100644
index 0000000..a37c85e
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/CircularIteratorTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import org.junit.Test;
+
+public class CircularIteratorTest {
+
+    @Test
+    public void testNullCollection() {
+        assertThrows(NullPointerException.class, () -> new CircularIterator<>(null));
+    }
+
+    @Test
+    public void testEmptyCollection() {
+        assertThrows(IllegalArgumentException.class, () -> new CircularIterator<>(Collections.emptyList()));
+    }
+
+    @Test()
+    public void testCycleCollection() {
+        final CircularIterator<String> it = new CircularIterator<>(Arrays.asList("A", "B", null, "C"));
+
+        assertEquals("A", it.peek());
+        assertTrue(it.hasNext());
+        assertEquals("A", it.next());
+        assertEquals("B", it.peek());
+        assertTrue(it.hasNext());
+        assertEquals("B", it.next());
+        assertEquals(null, it.peek());
+        assertTrue(it.hasNext());
+        assertEquals(null, it.next());
+        assertEquals("C", it.peek());
+        assertTrue(it.hasNext());
+        assertEquals("C", it.next());
+        assertEquals("A", it.peek());
+        assertTrue(it.hasNext());
+        assertEquals("A", it.next());
+        assertEquals("B", it.peek());
+
+        // Check that peek does not have any side-effects
+        assertEquals("B", it.peek());
+    }
+
+}