You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/01 20:07:48 UTC

kafka git commit: MINOR: improve MinTimestampTrackerTest and fix NPE when null element removed

Repository: kafka
Updated Branches:
  refs/heads/trunk 0fba52960 -> b380a82d5


MINOR: improve MinTimestampTrackerTest and fix NPE when null element removed

Author: Damian Guy <da...@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2611 from dguy/testing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b380a82d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b380a82d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b380a82d

Branch: refs/heads/trunk
Commit: b380a82d5be7c68141590467911ecb61db45ed1e
Parents: 0fba529
Author: Damian Guy <da...@gmail.com>
Authored: Wed Mar 1 12:07:46 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Mar 1 12:07:46 2017 -0800

----------------------------------------------------------------------
 .../internals/MinTimestampTracker.java          |  15 ++-
 .../internals/MinTimestampTrackerTest.java      | 102 ++++++++-----------
 2 files changed, 55 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index ef7d990..a67675c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -34,7 +34,7 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
     /**
      * @throws NullPointerException if the element is null
      */
-    public void addElement(Stamped<E> elem) {
+    public void addElement(final Stamped<E> elem) {
         if (elem == null) throw new NullPointerException();
 
         Stamped<E> minElem = descendingSubsequence.peekLast();
@@ -45,12 +45,19 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
         descendingSubsequence.offerLast(elem);
     }
 
-    public void removeElement(Stamped<E> elem) {
-        if (elem != null && descendingSubsequence.peekFirst() == elem)
+    public void removeElement(final Stamped<E> elem) {
+        if (elem == null) {
+            return;
+        }
+
+        if (descendingSubsequence.peekFirst() == elem) {
             descendingSubsequence.removeFirst();
+        }
 
-        if (descendingSubsequence.isEmpty())
+        if (descendingSubsequence.isEmpty()) {
             lastKnownTime = elem.timestamp;
+        }
+
     }
 
     public int size() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/b380a82d/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
index c398dc5..f6a1518 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MinTimestampTrackerTest.java
@@ -16,77 +16,63 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
 
 import org.junit.Test;
 
 public class MinTimestampTrackerTest {
 
-    private Stamped<String> elem(long timestamp) {
-        return new Stamped<>("", timestamp);
-    }
+    private MinTimestampTracker<String> tracker = new MinTimestampTracker<>();
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void testTracking() {
-        TimestampTracker<String> tracker = new MinTimestampTracker<>();
-
-        Object[] elems = new Object[]{
-            elem(100), elem(101), elem(102), elem(98), elem(99), elem(100)
-        };
-
-        int insertionIndex = 0;
-        int removalIndex = 0;
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // add 101
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(100L, tracker.get());
-
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 102
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(101L, tracker.get());
-
-        // add 98
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 99
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
-
-        // add 100
-        tracker.addElement((Stamped<String>) elems[insertionIndex++]);
-        assertEquals(98L, tracker.get());
+    public void shouldReturnNotKnownTimestampWhenNoRecordsEverAdded() throws Exception {
+        assertThat(tracker.get(), equalTo(TimestampTracker.NOT_KNOWN));
+    }
 
-        // remove 101
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
+    @Test
+    public void shouldReturnTimestampOfOnlyRecord() throws Exception {
+        tracker.addElement(elem(100));
+        assertThat(tracker.get(), equalTo(100L));
+    }
 
-        // remove 102
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(98L, tracker.get());
+    @Test
+    public void shouldReturnLowestAvailableTimestampFromAllInputs() throws Exception {
+        tracker.addElement(elem(100));
+        tracker.addElement(elem(99));
+        tracker.addElement(elem(102));
+        assertThat(tracker.get(), equalTo(99L));
+    }
 
-        // remove 98
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(99L, tracker.get());
+    @Test
+    public void shouldReturnLowestAvailableTimestampAfterPreviousLowestRemoved() throws Exception {
+        final Stamped<String> lowest = elem(88);
+        tracker.addElement(lowest);
+        tracker.addElement(elem(101));
+        tracker.addElement(elem(99));
+        tracker.removeElement(lowest);
+        assertThat(tracker.get(), equalTo(99L));
+    }
 
-        // remove 99
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
+    @Test
+    public void shouldReturnLastKnownTimestampWhenAllElementsHaveBeenRemoved() throws Exception {
+        final Stamped<String> record = elem(98);
+        tracker.addElement(record);
+        tracker.removeElement(record);
+        assertThat(tracker.get(), equalTo(98L));
+    }
 
-        // remove 100
-        tracker.removeElement((Stamped<String>) elems[removalIndex++]);
-        assertEquals(100L, tracker.get());
+    @Test
+    public void shouldIgnoreNullRecordOnRemove() throws Exception {
+        tracker.removeElement(null);
+    }
 
-        assertEquals(insertionIndex, removalIndex);
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerExceptionWhenTryingToAddNullElement() throws Exception {
+        tracker.addElement(null);
     }
 
+    private Stamped<String> elem(final long timestamp) {
+        return new Stamped<>("", timestamp);
+    }
 }
\ No newline at end of file