You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/17 21:35:22 UTC

[kafka] branch 2.1 updated: KAFKA-8347: Choose next record to process by timestamp (#6719)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 25d9920  KAFKA-8347: Choose next record to process by timestamp (#6719)
25d9920 is described below

commit 25d992038f03cc96b96496a065560f2ca3565c5c
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Thu May 16 07:07:54 2019 -0700

    KAFKA-8347: Choose next record to process by timestamp (#6719)
    
    When choosing the next record to process, we should look at the head record's timestamp of each partition and choose the lowest rather than choosing the lowest of the partition's streamtime.
    
    This change effectively makes RecordQueue return the timestamp of the head record rather than its streamtime. Streamtime is removed (replaced) from RecordQueue as it was only being tracked in order to choose the next partition to poll from.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>,  Bill Bejeck <bb...@gmail.com>
---
 .../streams/processor/internals/RecordQueue.java   | 17 ++----
 .../processor/internals/PartitionGroupTest.java    | 70 ++++++++++++++++++----
 .../processor/internals/RecordQueueTest.java       |  8 +--
 3 files changed, 68 insertions(+), 27 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 572e629..7f3c08d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -46,7 +46,6 @@ public class RecordQueue {
     private final RecordDeserializer recordDeserializer;
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
-    private long partitionTime = UNKNOWN;
     private StampedRecord headRecord = null;
 
     RecordQueue(final TopicPartition partition,
@@ -98,7 +97,7 @@ public class RecordQueue {
             fifoQueue.addLast(rawRecord);
         }
 
-        maybeUpdateTimestamp();
+        updateHead();
 
         return size();
     }
@@ -112,7 +111,7 @@ public class RecordQueue {
         final StampedRecord recordToReturn = headRecord;
         headRecord = null;
 
-        maybeUpdateTimestamp();
+        updateHead();
 
         return recordToReturn;
     }
@@ -142,7 +141,7 @@ public class RecordQueue {
      * @return timestamp
      */
     public long timestamp() {
-        return partitionTime;
+        return headRecord == null ? UNKNOWN : headRecord.timestamp;
     }
 
     /**
@@ -151,10 +150,9 @@ public class RecordQueue {
     public void clear() {
         fifoQueue.clear();
         headRecord = null;
-        partitionTime = UNKNOWN;
     }
 
-    private void maybeUpdateTimestamp() {
+    private void updateHead() {
         while (headRecord == null && !fifoQueue.isEmpty()) {
             final ConsumerRecord<byte[], byte[]> raw = fifoQueue.pollFirst();
             final ConsumerRecord<Object, Object> deserialized = recordDeserializer.deserialize(processorContext, raw);
@@ -166,7 +164,7 @@ public class RecordQueue {
 
             final long timestamp;
             try {
-                timestamp = timestampExtractor.extract(deserialized, partitionTime);
+                timestamp = timestampExtractor.extract(deserialized, timestamp());
             } catch (final StreamsException internalFatalExtractorException) {
                 throw internalFatalExtractorException;
             } catch (final Exception fatalUserException) {
@@ -187,11 +185,6 @@ public class RecordQueue {
             }
 
             headRecord = new StampedRecord(deserialized, timestamp);
-
-            // update the partition timestamp if the current head record's timestamp has exceed its value
-            if (timestamp > partitionTime) {
-                partitionTime = timestamp;
-            }
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index c84bbc2..6b95bdf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -117,7 +117,7 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[3, 5]
         // 2:[2, 4, 6]
-        // st: 2
+        // st: 1
         assertEquals(partition1, info.partition());
         verifyTimes(record, 1L, 1L);
         verifyBuffered(5, 2, 3);
@@ -127,7 +127,7 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[3, 5]
         // 2:[4, 6]
-        // st: 3
+        // st: 2
         assertEquals(partition2, info.partition());
         verifyTimes(record, 2L, 2L);
         verifyBuffered(4, 2, 2);
@@ -141,32 +141,32 @@ public class PartitionGroupTest {
         group.addRawRecords(partition1, list3);
         // 1:[3, 5, 2, 4]
         // 2:[4, 6]
-        // st: 3 (non-decreasing, so adding 2 doesn't change it)
+        // st: 2 (just adding records shouldn't change it)
         verifyBuffered(6, 4, 2);
         assertEquals(2L, group.timestamp());
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one record, time should not be advanced
+        // get one record, time should be advanced
         record = group.nextRecord(info);
         // 1:[5, 2, 4]
         // 2:[4, 6]
-        // st: 4 as partition st is now {5, 4}
+        // st: 3
         assertEquals(partition1, info.partition());
         verifyTimes(record, 3L, 3L);
         verifyBuffered(5, 3, 2);
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one record, time should not be advanced
+        // get one record, time should be advanced
         record = group.nextRecord(info);
         // 1:[5, 2, 4]
         // 2:[6]
-        // st: 5 as partition st is now {5, 6}
+        // st: 4
         assertEquals(partition2, info.partition());
         verifyTimes(record, 4L, 4L);
         verifyBuffered(4, 3, 1);
         assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one more record, now time should be advanced
+        // get one more record, time should be advanced
         record = group.nextRecord(info);
         // 1:[2, 4]
         // 2:[6]
@@ -190,17 +190,17 @@ public class PartitionGroupTest {
         record = group.nextRecord(info);
         // 1:[]
         // 2:[6]
-        // st: 4 (doesn't advance because 1 is empty, so it's still reporting the last-known time of 4)
+        // st: 5
         assertEquals(partition1, info.partition());
         verifyTimes(record, 4L, 5L);
         verifyBuffered(1, 0, 1);
         assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
 
-        // get one more record, time should not be advanced
+        // get one more record, time should be advanced
         record = group.nextRecord(info);
         // 1:[]
         // 2:[]
-        // st: 4 (1 and 2 are empty, so they are still reporting the last-known times of 4 and 6.)
+        // st: 6
         assertEquals(partition2, info.partition());
         verifyTimes(record, 6L, 6L);
         verifyBuffered(0, 0, 0);
@@ -208,6 +208,54 @@ public class PartitionGroupTest {
 
     }
 
+    @Test
+    public void shouldChooseNextRecordBasedOnHeadTimestamp() {
+        assertEquals(0, group.numBuffered());
+
+        // add three 3 records with timestamp 1, 5, 3 to partition-1
+        final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
+            new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 1, 3L, recordKey, recordValue));
+
+        group.addRawRecords(partition1, list1);
+
+        verifyBuffered(3, 3, 0);
+        assertEquals(-1L, group.timestamp());
+        assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+
+        StampedRecord record;
+        final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
+
+        // get first two records from partition 1
+        record = group.nextRecord(info);
+        assertEquals(record.timestamp, 1L);
+        record = group.nextRecord(info);
+        assertEquals(record.timestamp, 5L);
+
+        // add three 3 records with timestamp 2, 4, 6 to partition-2
+        final List<ConsumerRecord<byte[], byte[]>> list2 = Arrays.asList(
+            new ConsumerRecord<>("topic", 2, 2L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 4L, recordKey, recordValue),
+            new ConsumerRecord<>("topic", 2, 6L, recordKey, recordValue));
+
+        group.addRawRecords(partition2, list2);
+        // 1:[3]
+        // 2:[2, 4, 6]
+
+        // get one record, next record should be ts=2 from partition 2
+        record = group.nextRecord(info);
+        // 1:[3]
+        // 2:[4, 6]
+        assertEquals(record.timestamp, 2L);
+
+        // get one record, next up should have ts=3 from partition 1 (even though it has seen a larger max timestamp =5)
+        record = group.nextRecord(info);
+        // 1:[]
+        // 2:[4, 6]
+        assertEquals(record.timestamp, 3L);
+    }
+
     private void verifyTimes(final StampedRecord record, final long recordTime, final long streamTime) {
         assertEquals(recordTime, record.timestamp);
         assertEquals(streamTime, group.timestamp());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index b91aba5..c16cb2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -117,7 +117,7 @@ public class RecordQueueTest {
         // poll the first record, now with 1, 3
         assertEquals(2L, queue.poll().timestamp);
         assertEquals(2, queue.size());
-        assertEquals(2L, queue.timestamp());
+        assertEquals(1L, queue.timestamp());
 
         // poll the second record, now with 3
         assertEquals(1L, queue.poll().timestamp);
@@ -143,15 +143,15 @@ public class RecordQueueTest {
 
         // poll the rest records
         assertEquals(4L, queue.poll().timestamp);
-        assertEquals(4L, queue.timestamp());
+        assertEquals(1L, queue.timestamp());
 
         assertEquals(1L, queue.poll().timestamp);
-        assertEquals(4L, queue.timestamp());
+        assertEquals(2L, queue.timestamp());
 
         assertEquals(2L, queue.poll().timestamp);
         assertTrue(queue.isEmpty());
         assertEquals(0, queue.size());
-        assertEquals(4L, queue.timestamp());
+        assertEquals(RecordQueue.UNKNOWN, queue.timestamp());
 
         // add three more records with 4, 5, 6
         final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(