You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/03 18:52:59 UTC

[GitHub] [kafka] MarcoLotz opened a new pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

MarcoLotz opened a new pull request #10042:
URL: https://github.com/apache/kafka/pull/10042


   **Description:**
   A NPE is happening when using the StreamResetter tool with the arguments "to-timedate" and "by-duration" on empty partitions.
   
   **Cause:**
   This happens because [fetcher](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516) returns a null value on the map for empty partitions.
   
   **Solution:**
   The null values needs to be correctly handled with Optional. In the case of empty partition, consumer.seek() should not be called - letting it fallback to the "auto.offset.reset" behaviour. The user should be notified by it.
   
   **Implementation comments:**
   
   - System.out.println was used instead of log.warn - since the whole class uses syso for interacting with the user.
   - There was no test for empty partition on reset to offset [streamsResetter.resetOffsetsTo()]. Test was added.
   - Added test for empty partition when resetToDatetime() is called - that was the cause of the bug.
   - MockConsumer doesn't have an implementation of offsetsForTimes() method - it throws an exception when called, breaching Liskov Substitution Principle. This method is used by resetToDatetime(). Since I couldn't find Mockito in the project dependencies to make a Spy, I decided to extends the MockConsumer class as a nested class and especialize the method.
   - Looking at Gradle, I saw that the min. java version is 1.8 - thus is decided to use 1.8 implementation of Optional instead of 1.9 - which would be a lot cleaner specially for the Optional.ProcessOrElse(). 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-774675817


   @jeqo changed to seekLatest and aligned with UNKOWN_OFFSET recommendation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579446056



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +268,27 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        final long yesterdayTimestamp = Instant.now().minus(Duration.ofDays(1)).toEpochMilli();
+
+        streamsResetter.resetToDatetime(emptyConsumer, inputTopicPartitions, yesterdayTimestamp);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Some comment as above.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       Not sure if I understand this test. As we use a `MockConsumer` and we never call `addRecord()` this condition should be `true` independent of `StreamsResetter`.
   
   Should we not rather verify if `streamsResetter` did _commit_ offsets as expected?

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       > without a committed record
   
   Not sure what this means? Does `is empty` need any clarification?

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
       This test seems to be orthogonal to the fix. Was just wondering why we add it and what it's purpose is? It's always good to close testing gaps, just not sure if I understand what this test really tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-809269799


   @mjsax based on your points I have updated the tests. The new tests verify the seek operations and do test multiple offsets scenarios, to ensure no hardcoded zero offset.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r571434067



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +505,15 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset);

Review comment:
       just in case, latest consumer-group reset-offset is comparing against `ListOffsetsResponse.UNKNOWN_OFFSET` instead of `null`.
   
   https://github.com/apache/kafka/blob/0bc394cc1d19f1e41dd6646e9ac0e09b91fb1398/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L680-L681




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10042:
URL: https://github.com/apache/kafka/pull/10042


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r587485775



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       Empty should be clear enough. Will update it in #10092 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816981220


   Not your fault... `trunk` is broken: https://github.com/apache/kafka/pull/10517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r587485775



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       "empty" should be clear enough. Will update it in #10092 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r571617491



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +505,15 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset);

Review comment:
       You are right. ListOffsetsResponse.UNKNOWN_OFFSET value is -1. Clearly it's not enough to check if it is null (the cause of the NPE bug) but also we need to make sure that it is not ListOffsetsResponse.UNKNOWN_OFFSET value.
   
   I have added a filter to the Optional.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816957853


   updated the PR and rebased with trunk. Build seems to be failing on trunk due to RocksDBTimeOrderedWindowStore.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610849890



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,35 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetOffsetToSpecificOffsetWhenAfterEndOffset() {
+        final long[] beginningOffsets = {0L, 5L, 10L, 20L};
+        final long[] endOffsets = {0L, 5L, 10L, 20L};
+        // Test on multiple beginning and end offset combinations
+        for (int beginningOffsetIndex = 0; beginningOffsetIndex < beginningOffsets.length; beginningOffsetIndex++) {

Review comment:
       I actually think that a single combination beginning=5 end=10 should be sufficient.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +278,32 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+        final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
       So ensure that we really set to end, beginning and end should be set to different values, otherwise, we cannot distinguish both. Thus, similar to the test above, testing a single combination beginning=5 and end=10 seems sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579519988



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the commited offsets. Same applies for the other test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       @mjsax I guess got your point. In the CUT the call ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` makes the consumer client to seek that offset (if available) without a commit. The call to commit offset happens in another section of the code that is not under test there (line 407 of StreamsResetter.java).
   
   I will change the test logic so that the "given condition" uses ```streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);``` and "when condition" is actually the call of ```client.commitSync();```.
   
   This way, the "then condition" would be able to test using the committed offsets. Same applies for the other test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-816995678


   Hotfix merged. Retriggered Jenkins for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610904483



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +278,32 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+        final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
       Fair enough :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579504541



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {

Review comment:
       You are right - indeed the test is orthogonal to the fix. I have decided to add it there due to the lack of any test scenario for reseting partition offset (to a specific offset) on an empty partition. The original CUT passed this test without the bug-fix. I considered an important scenario to test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-788295749


   @mjsax can you please have a quick look on the updated tests?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579502090



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       I see your point, I don't mind removing "without a committed record" part of the message. @jeqo This would have to be updated on the scala code too, since I saw that the messages are about the same. Would that be ok?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610879225



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,35 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetOffsetToSpecificOffsetWhenAfterEndOffset() {
+        final long[] beginningOffsets = {0L, 5L, 10L, 20L};
+        final long[] endOffsets = {0L, 5L, 10L, 20L};
+        // Test on multiple beginning and end offset combinations
+        for (int beginningOffsetIndex = 0; beginningOffsetIndex < beginningOffsets.length; beginningOffsetIndex++) {

Review comment:
       Sure. Will simplify then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r573277614



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +268,26 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+        final Duration yesterday = Duration.ofDays(1);

Review comment:
       I've moved to the line just before L285. I would rather keep it there instead of provide it straight on line L285. Keeping it on a variable makes the //given //when //then more readable then providing straight in the //when call. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r586676476



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);

Review comment:
       Nit: To make sure we don't have any default/fall-back offset of zero encoded anywhere, it might be better to test with different offsets values for endOffset/beginningOffset and the target offset?
   
   Atm, if we would `seekToBeginning` as fallback instead of `seektToEnd` this test would still pass. Maybe best to just use 5, 10, 20 (or similar) for start, end, target.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+        emptyConsumer.position(topicPartition);
+
+        emptyConsumer.commitSync();

Review comment:
       Thanks for explaining on the other comment that `resetOffsetsTo` only seeks but does not commit.
   
   For this test, I am wondering if we can only verify if the seek happened without calling commit in the test code? This may make the test "cleaner". If fact, `emptyConsumer.position(topicPartition);` should return the seek position and it seems sufficient to verify its return value?
   
   I also hope we actually have a "integration test" that tests that both seek and commit happens when using the resetting (ie, an full test for this case instead of just a partial helper-method test).
   
   We might also add a comment that the helper method `resetOffsetsTo` does only seeks but not commits (for others to make sense of the test more easily).

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed record. Falling back to latest known offset.");

Review comment:
       If @jeqo has not objections, I think we should remove it for both cases.

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +274,32 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {

Review comment:
       Same comments as for the first test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-817951507


   Thanks for the fix @MarcoLotz! -- Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#issuecomment-817024578


   @mjsax thanks! Rebased with the hotfix. Now should build as expected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r573239173



##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -503,7 +506,16 @@ private void resetToDatetime(final Consumer<byte[], byte[]> client,
         final Map<TopicPartition, OffsetAndTimestamp> topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
 
         for (final TopicPartition topicPartition : inputTopicPartitions) {
-            client.seek(topicPartition, topicPartitionsAndOffset.get(topicPartition).offset());
+            final Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition))
+                    .map(OffsetAndTimestamp::offset)
+                    .filter(offset -> offset != ListOffsetsResponse.UNKNOWN_OFFSET);
+            if (partitionOffset.isPresent()) {
+                client.seek(topicPartition, partitionOffset.get());
+            } else {
+                client.seekToEnd(Collections.singletonList(topicPartition));
+                System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() +
+                        " is empty, without a committed offset. Falling back to latest offset.");

Review comment:
       nit:
   ```suggestion
                           " is empty, without a committed record. Falling back to latest known offset.");
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +268,26 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void emptyPartitionsAreCorrectlyHandledWhenResettingByDateAndTime() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new EmptyPartitionConsumer<>(OffsetResetStrategy.EARLIEST);
+        final Duration yesterday = Duration.ofDays(1);

Review comment:
       nit: could we move this initialization closer to its usage in L285? or just `.minus(Duration.ofDays(1))` could be simpler.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r603182032



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +79,30 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmptyResetsToLatestOffset() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+        emptyConsumer.position(topicPartition);
+
+        emptyConsumer.commitSync();

Review comment:
       Updated the code. I've confirmed that there are IT tests for it at ResetIntegrationTest.java.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r579543547



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -76,6 +78,25 @@ public void testResetToSpecificOffsetWhenBetweenBeginningAndEndOffset() {
         assertEquals(3, records.count());
     }
 
+    @Test
+    public void testResetToSpecificOffsetWhenPartitionIsEmpty() {
+        final MockConsumer<byte[], byte[]> emptyConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        emptyConsumer.assign(Collections.singletonList(topicPartition));
+
+        final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateEndOffsets(endOffsets);
+
+        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(topicPartition, 0L);
+        emptyConsumer.updateBeginningOffsets(beginningOffsets);
+
+        streamsResetter.resetOffsetsTo(emptyConsumer, inputTopicPartitions, 2L);
+
+        final ConsumerRecords<byte[], byte[]> records = emptyConsumer.poll(Duration.ofMillis(500));
+        assertEquals(0, records.count());

Review comment:
       @mjsax I have changed the PR to make sure that the tests verify the committed offset. It seems to me, however, that the original test implementer intent was to verify the position of the consumer after the method call - not if it committed the offset. Specially because the ```position()``` and ```commitAsync()``` calls happen outside the methods under test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] MarcoLotz commented on a change in pull request #10042: KAFKA-9527: fix NPE when using time-based argument for Stream Resetter

Posted by GitBox <gi...@apache.org>.
MarcoLotz commented on a change in pull request #10042:
URL: https://github.com/apache/kafka/pull/10042#discussion_r610877439



##########
File path: streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
##########
@@ -247,6 +278,32 @@ public void shouldDetermineInternalTopicBasedOnTopicName1() {
         assertTrue(streamsResetter.matchesInternalTopicFormat("appId-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-12323232-topic"));
     }
 
+    @Test
+    public void testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset() {
+        final long[] beginningAndEndOffsets = {0L, 5L, 10L, 20L};

Review comment:
       On this one, seems to me that only one value is sufficient. As the test describes `testResetToDatetimeWhenPartitionIsEmptyResetsToLatestOffset`, the partition is empty - thus beginning offset == end offset. I will change to have a beginning offset equals to 5 anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org