You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/08 23:07:19 UTC

kafka git commit: KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup

Repository: kafka
Updated Branches:
  refs/heads/trunk 9b44c3e7f -> ee1aaa091


KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>

Closes #4186 from guozhangwang/K6179-cleanup-timestamp-tracker-on-clear


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee1aaa09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee1aaa09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee1aaa09

Branch: refs/heads/trunk
Commit: ee1aaa091fc68587635604de5006b7acdb160361
Parents: 9b44c3e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 8 15:07:14 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 8 15:07:14 2017 -0800

----------------------------------------------------------------------
 .../internals/MinTimestampTracker.java          |  4 ++++
 .../processor/internals/RecordQueue.java        | 13 +++++++++-
 .../processor/internals/TimestampTracker.java   |  6 ++++-
 .../processor/internals/RecordQueueTest.java    | 25 ++++++++++++++++++++
 4 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index 17648e3..df35c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -77,4 +77,8 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
             return stamped.timestamp;
     }
 
+    public void clear() {
+        lastKnownTime = NOT_KNOWN;
+        ascendingSubsequence.clear();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index e6facaf..85f3b72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -180,9 +180,20 @@ public class RecordQueue {
     }
 
     /**
-     * Clear the fifo queue of its elements
+     * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
      */
     public void clear() {
         fifoQueue.clear();
+        timeTracker.clear();
+        partitionTime = TimestampTracker.NOT_KNOWN;
+    }
+
+    /*
+     * Returns the timestamp tracker of the record queue
+     *
+     * This is only used for testing
+     */
+    TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
+        return timeTracker;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
index 9d56b96..30c816d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
@@ -24,7 +24,7 @@ package org.apache.kafka.streams.processor.internals;
  */
 public interface TimestampTracker<E> {
 
-    static final long NOT_KNOWN = -1L;
+    long NOT_KNOWN = -1L;
 
     /**
      * Adds a stamped elements to this tracker.
@@ -54,4 +54,8 @@ public interface TimestampTracker<E> {
      */
     int size();
 
+    /**
+     * Empty the tracker by removing any tracked stamped elements
+     */
+    void clear();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c7af928..d7697cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -88,6 +88,8 @@ public class RecordQueueTest {
     public void testTimeTracking() {
 
         assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
 
         // add three 3 out-of-order records with timestamp 2, 1, 3
         List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -99,16 +101,19 @@ public class RecordQueueTest {
 
         assertEquals(3, queue.size());
         assertEquals(1L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
         assertEquals(1L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(1, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(1, queue.timeTracker().size());
 
         // add three 3 out-of-order records with timestamp 4, 1, 2
         // now with 3, 4, 1, 2
@@ -121,22 +126,28 @@ public class RecordQueueTest {
 
         assertEquals(4, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the third record, now with 4, 1, 2
         assertEquals(3L, queue.poll().timestamp);
         assertEquals(3, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(3L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
 
         assertEquals(1L, queue.poll().timestamp);
         assertEquals(3L, queue.timestamp());
+        assertEquals(1, queue.timeTracker().size());
 
         assertEquals(2L, queue.poll().timestamp);
+        assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
         assertEquals(3L, queue.timestamp());
+        assertEquals(0, queue.timeTracker().size());
 
         // add three more records with 4, 5, 6
         List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -153,6 +164,20 @@ public class RecordQueueTest {
         assertEquals(4L, queue.poll().timestamp);
         assertEquals(2, queue.size());
         assertEquals(5L, queue.timestamp());
+        assertEquals(2, queue.timeTracker().size());
+
+        // clear the queue
+        queue.clear();
+        assertTrue(queue.isEmpty());
+        assertEquals(0, queue.size());
+        assertEquals(0, queue.timeTracker().size());
+        assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+
+        // re-insert the three records with 4, 5, 6
+        queue.addRawRecords(list3);
+
+        assertEquals(3, queue.size());
+        assertEquals(4L, queue.timestamp());
     }
 
     @Test(expected = StreamsException.class)