You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/03/09 00:54:33 UTC

[GitHub] [pinot] npawar opened a new pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

npawar opened a new pull request #8309:
URL: https://github.com/apache/pinot/pull/8309


   For https://github.com/apache/pinot/issues/8219
   
   The root cause of issue was that the kafka sets the OffsetResetStrategy in SubscriptionState to "latest" by default.
   From `KafkaConsumer`
   ```
   OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
               this.subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
   ```
   And from `ConsumerConfig`:
   ```
   .define(AUTO_OFFSET_RESET_CONFIG,
                                           Type.STRING,
                                           "latest",
                                           in("latest", "earliest", "none"),
                                           Importance.MEDIUM,
                                           AUTO_OFFSET_RESET_DOC)
   ```
   As a result, when `_consumer.seek(offset)` happens to an offset that is out of range, it gets reset to `latest`. 
   Adding consumer config to reset this to `earliest`.
   
   Have verified via tests in KafkaPartitionConsumerTest and also manually, that this doesn't in any way affect table consumption from smallest/largest.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#issuecomment-1064565659


   > Thinking more about the other scenario we discussed offline (OOR due to broker trim, such that startOffset > latest). This patch will result in 1. going to earliest 2. possibly filtering everything out (assuming none of the rows of the batch are > latest) 3. keep incrementing an offset (data loss). On closer inspection, I realized that this will happen even in current state of code. Current code will result in 1. going to latest (instead of earliest) 2. could still filter everything out (though latest will be closer than earliest so lesser chances of that happening) 3. keep incrementing offset. The only way to recover from this case, is to wait till expected startOffset shows up again in stream.
   > 
   > This is a rare occurrence, and I feel it is okay to have the slight inefficiency because of setting OffsetResetStrategy to earliest.
   > 
   > The other option for us is: #8321. Explicitly checking for out of range where `startOffset < beginning ` But this one introduces an additional call to consumer API in every fetchMessages call.
   
   any thoughts on which way we should address this @xiangfu0 @mayankshriv ^^


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821288753



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
##########
@@ -112,4 +115,9 @@ public void deleteTopic(String topicName)
       throws ExecutionException, InterruptedException {
     _adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
   }
+
+  public void deleteRecordsBeforeOffset(String topicName, int partitionId, long offset) {
+    _adminClient.deleteRecords(

Review comment:
       not sure.. seems to be returning gracefully. this is only used in the tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar closed pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar closed pull request #8309:
URL: https://github.com/apache/pinot/pull/8309


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
navina commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821266312



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       Can you make this user-configurable ? we don't know what caused the OOR and what behavior how the user expects. Also, since this changes the default behavior, it is not a backward compatible change. 
   
   I would recommend making the default as "LATEST" and override, if the user has provided an override. 

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
##########
@@ -112,4 +115,9 @@ public void deleteTopic(String topicName)
       throws ExecutionException, InterruptedException {
     _adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
   }
+
+  public void deleteRecordsBeforeOffset(String topicName, int partitionId, long offset) {
+    _adminClient.deleteRecords(

Review comment:
       oo.. have never used this API. I am curious what happens when the offset provided here is invalid - does it throw or just ignores the call.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821287602



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       sure, made sure it won't override user provided config for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#issuecomment-1061391619


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8309](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (46fdff0) into [master](https://codecov.io/gh/apache/pinot/commit/3f98ce37fdaef0335fcd82e621489d65751b1f55?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3f98ce3) will **decrease** coverage by `40.03%`.
   > The diff coverage is `50.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/8309/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #8309       +/-   ##
   =============================================
   - Coverage     70.72%   30.69%   -40.04%     
   =============================================
     Files          1631     1623        -8     
     Lines         85279    85306       +27     
     Branches      12844    12861       +17     
   =============================================
   - Hits          60316    26181    -34135     
   - Misses        20799    56782    +35983     
   + Partials       4164     2343     -1821     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `28.85% <50.00%> (+0.16%)` | :arrow_up: |
   | integration2 | `27.68% <50.00%> (+0.19%)` | :arrow_up: |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../kafka20/KafkaPartitionLevelConnectionHandler.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25uZWN0aW9uSGFuZGxlci5qYXZh) | `90.47% <50.00%> (-9.53%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/table/FSTType.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0ZTVFR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/data/MetricFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9NZXRyaWNGaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/BigDecimalUtils.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQmlnRGVjaW1hbFV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/common/tier/TierFactory.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdGllci9UaWVyRmFjdG9yeS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...a/org/apache/pinot/spi/config/table/TableType.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1RhYmxlVHlwZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/pinot/spi/data/DimensionFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9EaW1lbnNpb25GaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../org/apache/pinot/spi/data/readers/FileFormat.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9yZWFkZXJzL0ZpbGVGb3JtYXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...org/apache/pinot/spi/config/table/QuotaConfig.java](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL1F1b3RhQ29uZmlnLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1143 more](https://codecov.io/gh/apache/pinot/pull/8309/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [3f98ce3...46fdff0](https://codecov.io/gh/apache/pinot/pull/8309?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
navina commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821977788



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       > I would treat this as a bug, instead of a configurable behavior. 
   
   Even if it is a bug, it is changing the existing behavior. So, I would expect the user to be alarmed if suddenly it resets to earliest and sees duplicate data. That's my 2 cents. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821287802



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       Actually, we do know what caused OOR. Server was down for a while, and when it came back up, the offsets had expired. This would be the most common case (along with any pause we add in the future). Such a user would always want the consumption to resume with minimal data loss.
   I would treat this as a bug, instead of a configurable behavior. By default we reset to latest, and cause a lot more data loss even though rows are present. With this, we forward to the next point from where we can consume. The behavior is similar to ValidationManager 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#issuecomment-1062441335


   Thinking more about the other scenario we discussed offline (OOR due to broker trim, such that startOffset > latest). This patch will result in 1. going to earliest 2. possibly filtering everything out (assuming none of the rows of the batch are > latest) 3. keep incrementing an offset (data loss). 
   On closer inspection, I realized that this will happen even in current state of code. Current code will result in 1. going to latest (instead of earliest) 2. could still filter everything out (though latest will be closer than earliest so lesser chances of that happening) 3. keep incrementing offset.
   The only way to recover from this case, is to wait till expected startOffset shows up again in stream.
   
   This is a rare occurrence, and I feel it is okay to have the slight inefficiency because of setting OffsetResetStrategy to earliest.
   
   The other option for us is: https://github.com/apache/pinot/pull/8321. Explicitly checking for out of range where `startOffset < beginning `
   But this one introduces an additional call to consumer API in every fetchMessages call.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821288060



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       not sure what the setting would be for 09 module.. there's no KafkaConsumer there, and cannot see a way to add such props.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
navina commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821975320



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       >not sure what the setting would be for 09 module.. there's no KafkaConsumer there, and cannot see a way to add such props. 
   
   oh my. I just checked the code .  It is using the zk based consumer. ok not worth fixing this version, unless the same issue pops up. 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
navina commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821972611



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/utils/MiniKafkaCluster.java
##########
@@ -112,4 +115,9 @@ public void deleteTopic(String topicName)
       throws ExecutionException, InterruptedException {
     _adminClient.deleteTopics(Collections.singletonList(topicName)).all().get();
   }
+
+  public void deleteRecordsBeforeOffset(String topicName, int partitionId, long offset) {
+    _adminClient.deleteRecords(

Review comment:
       ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a change in pull request #8309: KafkaConsumer to reset offset to earliest when fetch position out of range

Posted by GitBox <gi...@apache.org>.
navina commented on a change in pull request #8309:
URL: https://github.com/apache/pinot/pull/8309#discussion_r821269269



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
##########
@@ -56,6 +57,7 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
     consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, _config.getBootstrapHosts());
     consumerProp.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
     consumerProp.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());

Review comment:
       I think this option is available in older kafka versions too. So, should we make the changes in the `kafka09` module as well? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org