You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/11/30 13:41:49 UTC

[GitHub] [druid] FrankChen021 opened a new pull request #12008: Fixes a bug in Kafka auto reset

FrankChen021 opened a new pull request #12008:
URL: https://github.com/apache/druid/pull/12008


   
   Fixes #11658 
   
   
   ### Description
   
   The bug is detailed described in the issue above. 
   
   This PR fixes 
   1. the auto reset code by using the least available offset.
   2. the doc that the auto reset function has nothing to do with the `useEarliestOffset` or `useLatestOffset` configuration parameters.
   
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [X] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760188750



##########
File path: docs/development/extensions-core/kafka-supervisor-reference.md
##########
@@ -189,7 +189,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning
 | `indexSpecForIntermediatePersists`|                | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.                                                                                                                                                                                     | no (default = same as `indexSpec`)                                                                             |
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.                                                                                                                                                                                                                                                       | no (default == false)                                                                                        |
 | `handoffConditionTimeout`         | Long           | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | no (default == 0)                                                                                            |
-| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will cont
 inue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
+| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the least offset available in Kafka. Note that this can lead to data being _DROPPED_ without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping.<br/><br/>This featur
 e behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |

Review comment:
       updated

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();

Review comment:
       updated

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);

Review comment:
       updated

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (nextFetchingOffset < leastAvailableOffset) {
+          // reset offset to the least available position since it's unable to read messages from nextFetchingOffset
+          recordSupplier.seek(streamPartition, leastAvailableOffset);
+

Review comment:
       updated




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] OliveBZH commented on pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
OliveBZH commented on pull request #12008:
URL: https://github.com/apache/druid/pull/12008#issuecomment-1073650372


   Hello, can we have a status on this merge ?  Is it plan to merge it shortly ? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r761445607



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {

Review comment:
       Thanks for the clarification, @gianm! 
   
   @FrankChen021 - I would probably get rid of the log line I mentioned then and have the else block as it is (but moved as else condition for `if (outOfRangeOffset < earliestAvailableOffset)` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r761436330



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {

Review comment:
       For a partition that we've already committed some offset for, the starting offset given to a task for that partition is going to be the committed offset plus 1. So if no new messages have been written since the last commit, the starting offset will not exist yet.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r771807303



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset
+          // so the messages in [outOfRangeOffset, earliestAvailableOffset) is lost.
+          // These lost messages could not be restored even a manual reset is performed
+          // So, it's reasonable to reset the offset the earliest available position
+          //
+          recordSupplier.seek(streamPartition, earliestAvailableOffset);
+          newOffsetInMetadata.put(topicPartition, outOfRangeOffset);

Review comment:
       It should be the `earliestAvailableOffset`. This is a silly mistake. Thanks for pointing it out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760673991



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset
+          // so the messages in [outOfRangeOffset, earliestAvailableOffset) is lost.
+          // These lost messages could not be restored even a manual reset is performed
+          // So, it's reasonable to reset the offset the earliest available position
+          //
+          recordSupplier.seek(streamPartition, earliestAvailableOffset);
+          newOffsetInMetadata.put(topicPartition, outOfRangeOffset);
+        } else {
+          //
+          // There are two cases in theory here

Review comment:
       Good explanation!

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset
+          // so the messages in [outOfRangeOffset, earliestAvailableOffset) is lost.
+          // These lost messages could not be restored even a manual reset is performed
+          // So, it's reasonable to reset the offset the earliest available position
+          //
+          recordSupplier.seek(streamPartition, earliestAvailableOffset);
+          newOffsetInMetadata.put(topicPartition, outOfRangeOffset);

Review comment:
       Should this be `earliestAvailableOffset` and not `outOfRangeOffset`?  I will have to trace the `sendResetRequestAndWait` code to better understand what it expects the offset to be. 

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset

Review comment:
       Suggestion - this reads a little better. 
   In this case, it's probably because the messages are no longer in the Kafka cluster i.e. the messages in [outOfRangeOffset, earliestAvailableOffset) are lost. Since these lost messages can no longer be recovered, it's reasonable to reset the offset to the earliest available position to help ingestion resume. 
   
   Would also help to have a log line here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760785427



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {

Review comment:
       I think this logic can further be refined here to call reset within this block itself. Also, it looks like an `OffsetOutOfRangeException` is thrown when the offset for the partition is either larger or smaller than the range of offsets the server has for the given partition. So the case of earliestAvailableOffset <= outofRangeOffset <= latestAvailableOffset doesn't apply. 
   
   So it could look something like this:
   ```
           if (outOfRangeOffset < earliestAvailableOffset) {
             // In this case, it's probably because the messages are no longer in the Kafka cluster i.e. the messages in
             // [outOfRangeOffset, earliestAvailableOffset) are lost. Since these lost messages can no longer be
             // recovered, it's reasonable to reset the offset to the earliest available position to help ingestion resume.
   
             logger.warn("Seeking kafka offset to earliest offset: " + earliestAvailableOffset);
             recordSupplier.seek(streamPartition, earliestAvailableOffset);
             // TBD: still need to confirm if this should be outOfRangeOffset or earliestAvailableOffset
             newOffsetInMetadata.put(topicPartition, outOfRangeOffset);
             logger.warn("Resetting offset in metadata for "
                         + topicPartition
                         + " to earliest offset: "
                         + earliestAvailableOffset);
             sendResetRequestAndWait(CollectionUtils.mapKeys(
                 newOffsetInMetadata,
                 streamPartition -> StreamPartition.of(
                     streamPartition.topic(),
                     streamPartition.partition()
                 )
             ), taskToolbox);
           } else {
             // With the offset not in range (earliestAvailableOffset, latestAvailableOffset), there is not much we can do
             // but wait for the available offsets in the partition to arrive in the range.
             logger.warn("Offset "
                         + outOfRangeOffset
                         + " is out of range of the available offsets for "
                         + topicPartition
                         + ". It is likely that a manual offset reset of the supervisor is needed");
   
             log.warn("Retrying in %dms", task.getPollRetryMs());
             pollRetryLock.lockInterruptibly();
             try {
               long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs());
               while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
                 nanos = isAwaitingRetry.awaitNanos(nanos);
               }
             }
             finally {
               pollRetryLock.unlock();
             }
           }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r759879813



##########
File path: docs/development/extensions-core/kafka-supervisor-reference.md
##########
@@ -189,7 +189,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning
 | `indexSpecForIntermediatePersists`|                | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.                                                                                                                                                                                     | no (default = same as `indexSpec`)                                                                             |
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.                                                                                                                                                                                                                                                       | no (default == false)                                                                                        |
 | `handoffConditionTimeout`         | Long           | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | no (default == 0)                                                                                            |
-| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will cont
 inue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
+| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the least offset available in Kafka. Note that this can lead to data being _DROPPED_ without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping.<br/><br/>This featur
 e behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |

Review comment:
       This matches the behavior I have observed. I don't see where `useEarliestOffset` property is actually being used. 

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();

Review comment:
       Would a better name be `outOfRangeOffset`? 

##########
File path: docs/development/extensions-core/kafka-supervisor-reference.md
##########
@@ -189,7 +189,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning
 | `indexSpecForIntermediatePersists`|                | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.                                                                                                                                                                                     | no (default = same as `indexSpec`)                                                                             |
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.                                                                                                                                                                                                                                                       | no (default == false)                                                                                        |
 | `handoffConditionTimeout`         | Long           | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           | no (default == 0)                                                                                            |
-| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will cont
 inue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
+| `resetOffsetAutomatically`        | Boolean        | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../operations/api-reference.md#supervisors). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the least offset available in Kafka. Note that this can lead to data being _DROPPED_ without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping.<br/><br/>This featur
 e behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |

Review comment:
       Kafka terminology is EARLIEST or LATEST offset. We should probably still have it as `earliest offset available in Kafka`.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (nextFetchingOffset < leastAvailableOffset) {
+          // reset offset to the least available position since it's unable to read messages from nextFetchingOffset
+          recordSupplier.seek(streamPartition, leastAvailableOffset);
+

Review comment:
       nit: can probably get rid of the empty line here. 

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);

Review comment:
       I would probably name it as `earliestAvailableOffset` to match Kafka terminology. But it's ok if you don't do it in this PR :). 

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);

Review comment:
       I am not 100% sure if we can get rid of this seek. The purpose of this seek here is reset the offset back to what it was before `getEarliestSequenceNumber` is called. 
   
   
   `recordSupplier.getEarliestSequenceNumber(streamPartition)` is doing the following:
   ```
   Long currPos = getPosition(partition);
   seekToEarliest(Collections.singleton(partition));
   Long nextPos = getPosition(partition);
   seek(partition, currPos);
   ```
   If it can be guaranteed that currPos `Long currPos = getPosition(partition)` is storing the same offset as `nextOffset`, then we can get rid of this seek. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760673991



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset
+          // so the messages in [outOfRangeOffset, earliestAvailableOffset) is lost.
+          // These lost messages could not be restored even a manual reset is performed
+          // So, it's reasonable to reset the offset the earliest available position
+          //
+          recordSupplier.seek(streamPartition, earliestAvailableOffset);
+          newOffsetInMetadata.put(topicPartition, outOfRangeOffset);
+        } else {
+          //
+          // There are two cases in theory here

Review comment:
       Good explanation!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r761332196



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {

Review comment:
       @gianm - could you explain the scenario in which the offset we are asking the consumer to seek to could possibly be higher than the latest available offset in Kafka? 
   ```
   @Nonnull
     @Override
     protected List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> getRecords(
         RecordSupplier<Integer, Long, KafkaRecordEntity> recordSupplier,
         TaskToolbox toolbox
     ) throws Exception
     {
       try {
         return recordSupplier.poll(task.getIOConfig().getPollTimeout());
       }
       catch (OffsetOutOfRangeException e) {
         //
         // Handles OffsetOutOfRangeException, which is thrown if the seeked-to
         // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
         // that has not been written yet (which is totally legitimate). So let's wait for it to show up
         //
         log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
         possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), recordSupplier, toolbox);
         return Collections.emptyList();
       }
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r771807517



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {

Review comment:
       @samarthjain I kept the code block as it was because original code reset the meta for all partition out of the partition loop and only reset for one time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760185795



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,35 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long nextFetchingOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
+
         final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
         if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);

Review comment:
       The `getEarliestSequenceNumber` guarantees that after this method call, the offset at the consumer side is where it is before the call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] samarthjain commented on a change in pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
samarthjain commented on a change in pull request #12008:
URL: https://github.com/apache/druid/pull/12008#discussion_r760673936



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java
##########
@@ -126,38 +126,56 @@ private void possiblyResetOffsetsOrWait(
       TaskToolbox taskToolbox
   ) throws InterruptedException, IOException
   {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
+    final Map<TopicPartition, Long> newOffsetInMetadata = new HashMap<>();
+
     if (task.getTuningConfig().isResetOffsetAutomatically()) {
       for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
         final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
+        final long outOfRangeOffset = outOfRangePartition.getValue();
+
         StreamPartition<Integer> streamPartition = StreamPartition.of(
             topicPartition.topic(),
             topicPartition.partition()
         );
-        final Long leastAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
-        if (leastAvailableOffset == null) {
-          throw new ISE(
-              "got null sequence number for partition[%s] when fetching from kafka!",
-              topicPartition.partition()
-          );
+
+        final Long earliestAvailableOffset = recordSupplier.getEarliestSequenceNumber(streamPartition);
+        if (earliestAvailableOffset == null) {
+          throw new ISE("got null earliest sequence number for partition[%s] when fetching from kafka!",
+                        topicPartition.partition());
         }
-        // reset the seek
-        recordSupplier.seek(streamPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
+
+        if (outOfRangeOffset < earliestAvailableOffset) {
+          //
+          // In this case, it's probably because partition expires before the Druid could read from next offset
+          // so the messages in [outOfRangeOffset, earliestAvailableOffset) is lost.
+          // These lost messages could not be restored even a manual reset is performed
+          // So, it's reasonable to reset the offset the earliest available position
+          //
+          recordSupplier.seek(streamPartition, earliestAvailableOffset);
+          newOffsetInMetadata.put(topicPartition, outOfRangeOffset);
+        } else {
+          //

Review comment:
       Nit: remove




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] FrankChen021 commented on pull request #12008: Fixes a bug in Kafka auto reset

Posted by GitBox <gi...@apache.org>.
FrankChen021 commented on pull request #12008:
URL: https://github.com/apache/druid/pull/12008#issuecomment-985985447


   @samarthjain I'm almost exhausted these days. Give me more days to resolve your comments. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org