You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/19 20:08:15 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579446056



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +268,27 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        final long yesterdayTimestamp = Instant.now().minus(Duration.ofDays(1)).toEpochMilli();
+
+        streamsResetter.resetToDatetime(emptyConsumer, inputTopicPartitions, yesterdayTimestamp);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Some comment as above.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Not sure if I understand this test. As we use a `MockConsumer` and we never call `addRecord()` this condition should be `true` independent of `StreamsResetter`.
   
   Should we not rather verify if `streamsResetter` did _commit_ offsets as expected?

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       > without a committed record
   
   Not sure what this means? Does `is empty` need any clarification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
       This test seems to be orthogonal to the fix. Was just wondering why we add it and what it's purpose is? It's always good to close testing gaps, just not sure if I understand what this test really tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org