You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/08 23:07:19 UTC
kafka git commit: KAFKA-6179: Clear min timestamp tracker upon
partition queue cleanup
Repository: kafka
Updated Branches:
refs/heads/trunk 9b44c3e7f -> ee1aaa091
KAFKA-6179: Clear min timestamp tracker upon partition queue cleanup
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Damian Guy <da...@gmail.com>
Closes #4186 from guozhangwang/K6179-cleanup-timestamp-tracker-on-clear
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ee1aaa09
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ee1aaa09
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ee1aaa09
Branch: refs/heads/trunk
Commit: ee1aaa091fc68587635604de5006b7acdb160361
Parents: 9b44c3e
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Nov 8 15:07:14 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 8 15:07:14 2017 -0800
----------------------------------------------------------------------
.../internals/MinTimestampTracker.java | 4 ++++
.../processor/internals/RecordQueue.java | 13 +++++++++-
.../processor/internals/TimestampTracker.java | 6 ++++-
.../processor/internals/RecordQueueTest.java | 25 ++++++++++++++++++++
4 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
index 17648e3..df35c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/MinTimestampTracker.java
@@ -77,4 +77,8 @@ public class MinTimestampTracker<E> implements TimestampTracker<E> {
return stamped.timestamp;
}
+ public void clear() {
+ lastKnownTime = NOT_KNOWN;
+ ascendingSubsequence.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index e6facaf..85f3b72 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -180,9 +180,20 @@ public class RecordQueue {
}
/**
- * Clear the fifo queue of its elements
+ * Clear the fifo queue of its elements, also clear the time tracker's kept stamped elements
*/
public void clear() {
fifoQueue.clear();
+ timeTracker.clear();
+ partitionTime = TimestampTracker.NOT_KNOWN;
+ }
+
+ /*
+ * Returns the timestamp tracker of the record queue
+ *
+ * This is only used for testing
+ */
+ TimestampTracker<ConsumerRecord<Object, Object>> timeTracker() {
+ return timeTracker;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
index 9d56b96..30c816d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
@@ -24,7 +24,7 @@ package org.apache.kafka.streams.processor.internals;
*/
public interface TimestampTracker<E> {
- static final long NOT_KNOWN = -1L;
+ long NOT_KNOWN = -1L;
/**
* Adds a stamped elements to this tracker.
@@ -54,4 +54,8 @@ public interface TimestampTracker<E> {
*/
int size();
+ /**
+ * Empty the tracker by removing any tracked stamped elements
+ */
+ void clear();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/ee1aaa09/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index c7af928..d7697cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -88,6 +88,8 @@ public class RecordQueueTest {
public void testTimeTracking() {
assertTrue(queue.isEmpty());
+ assertEquals(0, queue.size());
+ assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
// add three 3 out-of-order records with timestamp 2, 1, 3
List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
@@ -99,16 +101,19 @@ public class RecordQueueTest {
assertEquals(3, queue.size());
assertEquals(1L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
// poll the first record, now with 1, 3
assertEquals(2L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(1L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
// poll the second record, now with 3
assertEquals(1L, queue.poll().timestamp);
assertEquals(1, queue.size());
assertEquals(3L, queue.timestamp());
+ assertEquals(1, queue.timeTracker().size());
// add three 3 out-of-order records with timestamp 4, 1, 2
// now with 3, 4, 1, 2
@@ -121,22 +126,28 @@ public class RecordQueueTest {
assertEquals(4, queue.size());
assertEquals(3L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
// poll the third record, now with 4, 1, 2
assertEquals(3L, queue.poll().timestamp);
assertEquals(3, queue.size());
assertEquals(3L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
// poll the rest records
assertEquals(4L, queue.poll().timestamp);
assertEquals(3L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
assertEquals(1L, queue.poll().timestamp);
assertEquals(3L, queue.timestamp());
+ assertEquals(1, queue.timeTracker().size());
assertEquals(2L, queue.poll().timestamp);
+ assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
assertEquals(3L, queue.timestamp());
+ assertEquals(0, queue.timeTracker().size());
// add three more records with 4, 5, 6
List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -153,6 +164,20 @@ public class RecordQueueTest {
assertEquals(4L, queue.poll().timestamp);
assertEquals(2, queue.size());
assertEquals(5L, queue.timestamp());
+ assertEquals(2, queue.timeTracker().size());
+
+ // clear the queue
+ queue.clear();
+ assertTrue(queue.isEmpty());
+ assertEquals(0, queue.size());
+ assertEquals(0, queue.timeTracker().size());
+ assertEquals(TimestampTracker.NOT_KNOWN, queue.timestamp());
+
+ // re-insert the three records with 4, 5, 6
+ queue.addRawRecords(list3);
+
+ assertEquals(3, queue.size());
+ assertEquals(4L, queue.timestamp());
}
@Test(expected = StreamsException.class)