You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/18 12:42:48 UTC

[GitHub] [pinot] richardstartin opened a new pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

richardstartin opened a new pull request #7927:
URL: https://github.com/apache/pinot/pull/7927


   eb6800da96a44e6f5125097cc99a368c2f8f8847 creates conditions where `LLRealtimeSegmentDataManager` cannot advance if a message batch consumed from Kafka is entirely filtered out, e.g. because it only contains tombstones. `LLCRealtimeClusterIntegrationTest` hangs on this commit.
   
   01f9f16a18b188bd3e15408ad945eda9541caf75 allows offsets to be committed when the entire batch was filtered out. It also fixes some of the myriad warnings and concurrency bugs in `LLRealtimeSegmentDataManager`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821814



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -543,14 +552,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
 
       _currentOffset = messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index);
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
-      _numRowsConsumed++;
       streamMessageCount++;
+      NUM_ROWS_CONSUMED_UPDATER.incrementAndGet(this);
     }
     updateCurrentDocumentCountMetrics();
     if (streamMessageCount != 0) {
       _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}", indexedMessageCount,
           streamMessageCount, _currentOffset);
-    } else {
+    } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {

Review comment:
       Prevents unnecessary latency when there has been a bad batch, there is probably data waiting to be consumed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-998475535


   > On a separate note, I saw that originally there were some optimizations piggy-backed to this PR. We should avoid doing that. Each PR should only focus on one feature, one bug fix, etc. Any optimization or refactoring should go in a separate PR. That might take a little longer for the author, but it benefits all of us in the long run.
   
   There were no optimisations in this PR, there were some fixes for concurrency bugs (e.g. it's incorrect to use an increment operator on a volatile variable) so I'm not sure what you're referring to. 
   
   > One example of this issue happened last week when we spent a long time finding the root cause for the chunk index writer bug. The problematic optimization was added in a PR with title Add MV raw forward index and MV BYTES data type. 
   
   I'm not sure this is the best place to be discussing this but you are implying that the change you mentioned was unrelated to the PR it was made in, but it wasn't. The purpose of the change, as has been discussed, is that the particular class creates very large buffers. It was included in that PR because its use for MV BYTES columns exacerbates this by multiplying what is already an overestimate of the buffer size by the maximum number of elements in the column. So the change was a mitigation to the worst case.
   
   > The table having performance problem didn't have any multi-value columns or byte data type. So we couldn't easily tell if this commit is related or not. And there were a lot of commits we needed to examine. If we had a separate PR for that optimization, we could've found the root cause much easier!
   
   Looking at commits to figure out what changed isn't an efficient diagnostic technique. Had you instead used a profiler you would have found the process was spending a large amount of time in the syscalls `mmap` and `munmap`; looking at the git blame for each Pinot stack frame above those syscalls would have found the culprit in O(stack depth) time rather than O(lines of code changed). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (29e5d20) into [master](https://codecov.io/gh/apache/pinot/commit/58e7f10c7a0bb8e9137377e913797d54f9b89f8a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (58e7f10) will **decrease** coverage by `56.74%`.
   > The diff coverage is `56.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.08%   14.33%   -56.75%     
   + Complexity     4109       80     -4029     
   =============================================
     Files          1593     1548       -45     
     Lines         82373    80508     -1865     
     Branches      12269    12072      -197     
   =============================================
   - Hits          58555    11543    -47012     
   - Misses        19867    68108    +48241     
   + Partials       3951      857     -3094     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.33% <56.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-71.82%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1267 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [58e7f10...29e5d20](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772591383



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       There is a single consuming thread, so it should not have concurrency issue. I'd suggest keeping the existing code to avoid the unnecessary overhead of using updater. We can add annotation to disable the IDE warning on updating volatile fields and comment about the single thread consuming behavior.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e19d50) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.84%`.
   > The diff coverage is `88.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.15%   70.30%   -0.85%     
   - Complexity     4111     4114       +3     
   ============================================
     Files          1593     1593              
     Lines         82365    82377      +12     
     Branches      12270    12275       +5     
   ============================================
   - Hits          58609    57918     -691     
   - Misses        19806    20501     +695     
   - Partials       3950     3958       +8     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.09% <77.14%> (-0.01%)` | :arrow_down: |
   | integration2 | `?` | |
   | unittests1 | `68.08% <56.25%> (+0.01%)` | :arrow_up: |
   | unittests2 | `14.33% <40.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `85.00% <84.61%> (-1.67%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `71.17% <100.00%> (-0.66%)` | :arrow_down: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [...t/core/plan/StreamingInstanceResponsePlanNode.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ0luc3RhbmNlUmVzcG9uc2VQbGFuTm9kZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/operator/streaming/StreamingResponseUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nUmVzcG9uc2VVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ager/realtime/PeerSchemeSplitSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUGVlclNjaGVtZVNwbGl0U2VnbWVudENvbW1pdHRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/pinot/common/utils/grpc/GrpcQueryClient.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZ3JwYy9HcnBjUXVlcnlDbGllbnQuamF2YQ==) | `0.00% <0.00%> (-94.74%)` | :arrow_down: |
   | ... and [89 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...8e19d50](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r773005687



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);

Review comment:
       Inspecting the Kafka client source code, if the null check on line 57 ever failed, so would Kafka client internals (e.g. see `ConsumerRecords#count`) - this check was doesn't have a test case so I cannot see the basis for it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772769012



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
##########
@@ -30,19 +27,26 @@
 
 public class KafkaMessageBatch implements MessageBatch<byte[]> {
 
-  private List<MessageAndOffset> _messageList = new ArrayList<>();
+  private final List<MessageAndOffset> _messageList;
+  private final int _unfilteredMessageCount;
+  private final long _lastOffset;
 
-  public KafkaMessageBatch(Iterable<ConsumerRecord<String, Bytes>> iterable) {
-    for (ConsumerRecord<String, Bytes> record : iterable) {
-      _messageList.add(new MessageAndOffset(record.value().get(), record.offset()));
-    }
+  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<MessageAndOffset> batch) {

Review comment:
       Please add javadoc for the arguments and explain why they are needed.

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
 
+  /**
+   * @return last offset in the batch
+   */
+  default StreamPartitionMsgOffset getLastOffset() {
+    return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);

Review comment:
       `getMessageCount() - 1` is not a valid index. Since this method is overridden for kafka 2.0 and the other streams are not supposed to use this method, at least for now, I believe we should throw IllegalStateException here.

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);

Review comment:
       `lastOffset` is used only in `getOffsetOfNextBatch` method. I suggest we rename it to offsetOfNextBatch here and also in KafkaMessageBatch class.
   Also, last offset is being used only when all messages are filtered out. So basically offsetOfNextBatch, in that case, will always be startOffset + 1, which is inefficient. If there are N messages in the batch, we'll fetch N-1 null messages again. We can use `startOffset + N` as the value for offsetOfNextBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821867



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
##########
@@ -63,6 +63,11 @@
   private final boolean _enableLeadControllerResource = RANDOM.nextBoolean();
   private final long _startTime = System.currentTimeMillis();
 
+  @Override
+  protected boolean injectTombstones() {
+    return true;

Review comment:
       Set this to false to make the test pass at eb6800da96a44e6f5125097cc99a368c2f8f8847 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged pull request #7927:
URL: https://github.com/apache/pinot/pull/7927


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771822032



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
+      int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());

Review comment:
       Note that a list was being materialised anyway in `KafkaMessageBatch`, it's just easier to do it here because we can also capture the last offset. This is likely more efficient than using `Iterables.filter` anyway.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772595460



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       the cost of the updater (an apparently uncontended CAS) is of a similar magnitude to a volatile write.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e19d50) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `0.00%`.
   > The diff coverage is `88.57%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.15%   71.14%   -0.01%     
   - Complexity     4111     4114       +3     
   ============================================
     Files          1593     1593              
     Lines         82365    82377      +12     
     Branches      12270    12275       +5     
   ============================================
   + Hits          58609    58610       +1     
   - Misses        19806    19813       +7     
   - Partials       3950     3954       +4     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.09% <77.14%> (-0.01%)` | :arrow_down: |
   | integration2 | `27.59% <60.00%> (+<0.01%)` | :arrow_up: |
   | unittests1 | `68.08% <56.25%> (+0.01%)` | :arrow_up: |
   | unittests2 | `14.33% <40.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `85.00% <84.61%> (-1.67%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `72.80% <100.00%> (+0.96%)` | :arrow_up: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [...core/query/executor/ServerQueryExecutorV1Impl.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9leGVjdXRvci9TZXJ2ZXJRdWVyeUV4ZWN1dG9yVjFJbXBsLmphdmE=) | `83.33% <0.00%> (-4.42%)` | :arrow_down: |
   | [...pinot/core/query/request/context/TimerContext.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L2NvbnRleHQvVGltZXJDb250ZXh0LmphdmE=) | `91.66% <0.00%> (-4.17%)` | :arrow_down: |
   | [.../pinot/core/query/scheduler/PriorityScheduler.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9zY2hlZHVsZXIvUHJpb3JpdHlTY2hlZHVsZXIuamF2YQ==) | `80.82% <0.00%> (-2.74%)` | :arrow_down: |
   | [...core/startree/operator/StarTreeFilterOperator.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zdGFydHJlZS9vcGVyYXRvci9TdGFyVHJlZUZpbHRlck9wZXJhdG9yLmphdmE=) | `85.31% <0.00%> (-2.10%)` | :arrow_down: |
   | ... and [13 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...8e19d50](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-999788784


   @richardstartin I don't want to get to details here. My point was that one PR should focus on one thing, not more!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821729



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -401,16 +409,12 @@ protected boolean consumeLoop()
             .fetchMessages(_currentOffset, null, _partitionLevelStreamConfig.getFetchTimeoutMillis());
         _endOfPartitionGroup = messageBatch.isEndOfPartitionGroup();
         _consecutiveErrorCount = 0;
-      } catch (TimeoutException e) {
-        handleTransientStreamErrors(e);
-        continue;
-      } catch (TransientConsumerException e) {
-        handleTransientStreamErrors(e);
-        continue;
       } catch (PermanentConsumerException e) {
         _segmentLogger.warn("Permanent exception from stream when fetching messages, stopping consumption", e);
         throw e;
       } catch (Exception e) {
+        // all exceptions but PermanentConsumerException are handled the same way
+        // can be a TimeoutException or TransientConsumerException routinely

Review comment:
       Collapsed identical catch blocks for the sake of the person reading the code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771822197



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamPartitionMsgOffset.java
##########
@@ -40,7 +40,7 @@
  * versions of the stream implementation
  */
 @InterfaceStability.Evolving
-public interface StreamPartitionMsgOffset extends Comparable {
+public interface StreamPartitionMsgOffset extends Comparable<StreamPartitionMsgOffset> {

Review comment:
       This caused a lot of warnings which make code harder to read, and helps the compiler to prevent heap pollution bugs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771822223



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -31,11 +31,17 @@
 @InterfaceStability.Stable
 public interface MessageBatch<T> {
   /**
-   *
-   * @return number of messages returned from the stream
+   * @return number of available messages
    */
   int getMessageCount();
 
+  /**
+   * @return number of messages returned from the stream
+   */
+  default int getUnfilteredMessageCount() {
+    return getMessageCount();

Review comment:
       Implemented for Kafka 2.0 only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e19d50) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `56.82%`.
   > The diff coverage is `40.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.15%   14.33%   -56.83%     
   + Complexity     4111       80     -4031     
   =============================================
     Files          1593     1548       -45     
     Lines         82365    80506     -1859     
     Branches      12270    12072      -198     
   =============================================
   - Hits          58609    11540    -47069     
   - Misses        19806    68108    +48302     
   + Partials       3950      858     -3092     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.33% <40.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-71.84%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1267 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...8e19d50](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8e19d50) into [master](https://codecov.io/gh/apache/pinot/commit/6037cac427c310c298d78b5fbcd11b6757982f73?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6037cac) will **decrease** coverage by `6.39%`.
   > The diff coverage is `65.71%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.15%   64.76%   -6.40%     
   - Complexity     4111     4114       +3     
   ============================================
     Files          1593     1548      -45     
     Lines         82365    80506    -1859     
     Branches      12270    12072     -198     
   ============================================
   - Hits          58609    52137    -6472     
   - Misses        19806    24657    +4851     
   + Partials       3950     3712     -238     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `68.08% <56.25%> (+0.01%)` | :arrow_up: |
   | unittests2 | `14.33% <40.00%> (-0.01%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `47.63% <64.28%> (-24.21%)` | :arrow_down: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [373 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [6037cac...8e19d50](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (29e5d20) into [master](https://codecov.io/gh/apache/pinot/commit/58e7f10c7a0bb8e9137377e913797d54f9b89f8a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (58e7f10) will **increase** coverage by `0.03%`.
   > The diff coverage is `84.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   + Coverage     71.08%   71.11%   +0.03%     
   - Complexity     4109     4116       +7     
   ============================================
     Files          1593     1593              
     Lines         82373    82379       +6     
     Branches      12269    12274       +5     
   ============================================
   + Hits          58555    58586      +31     
   + Misses        19867    19852      -15     
   + Partials       3951     3941      -10     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.01% <84.00%> (+0.01%)` | :arrow_up: |
   | integration2 | `27.64% <60.00%> (+0.07%)` | :arrow_up: |
   | unittests1 | `68.11% <0.00%> (+0.05%)` | :arrow_up: |
   | unittests2 | `14.33% <56.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `85.00% <84.61%> (-1.67%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `73.35% <100.00%> (+1.53%)` | :arrow_up: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [...ller/helix/core/minion/TaskTypeMetricsUpdater.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL21pbmlvbi9UYXNrVHlwZU1ldHJpY3NVcGRhdGVyLmphdmE=) | `80.00% <0.00%> (-20.00%)` | :arrow_down: |
   | [...nction/DistinctCountBitmapAggregationFunction.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9hZ2dyZWdhdGlvbi9mdW5jdGlvbi9EaXN0aW5jdENvdW50Qml0bWFwQWdncmVnYXRpb25GdW5jdGlvbi5qYXZh) | `47.66% <0.00%> (-9.85%)` | :arrow_down: |
   | [.../org/apache/pinot/core/startree/StarTreeUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9zdGFydHJlZS9TdGFyVHJlZVV0aWxzLmphdmE=) | `69.89% <0.00%> (-2.16%)` | :arrow_down: |
   | [...e/pinot/segment/local/io/util/PinotDataBitSet.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pby91dGlsL1Bpbm90RGF0YUJpdFNldC5qYXZh) | `95.62% <0.00%> (-1.46%)` | :arrow_down: |
   | ... and [24 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [58e7f10...29e5d20](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (29e5d20) into [master](https://codecov.io/gh/apache/pinot/commit/58e7f10c7a0bb8e9137377e913797d54f9b89f8a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (58e7f10) will **decrease** coverage by `6.29%`.
   > The diff coverage is `56.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.08%   64.78%   -6.30%     
   - Complexity     4109     4116       +7     
   ============================================
     Files          1593     1548      -45     
     Lines         82373    80508    -1865     
     Branches      12269    12072     -197     
   ============================================
   - Hits          58555    52161    -6394     
   - Misses        19867    24641    +4774     
   + Partials       3951     3706     -245     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `68.11% <0.00%> (+0.05%)` | :arrow_up: |
   | unittests2 | `14.33% <56.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `46.56% <0.00%> (-25.25%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [373 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [58e7f10...29e5d20](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821708



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       Concurrency bug: possible for two threads to set the end time, possible outcomes are 
   1. no change to `_consumeEndTime`
   2. `_consumeEndTime` incremented by `TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)`
   3. `_consumeEndTime` incremented by `2 * TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821460



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;

Review comment:
       Concurrency bug: inconsistent reads of a volatile variable possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821708



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       Concurrency bug: possible for two threads to set the end time, possible outcomes are 
   1. `_consumeEndTime` incremented by `TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)`
   2. `_consumeEndTime` incremented by `2 * TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3e3184b) into [master](https://codecov.io/gh/apache/pinot/commit/71fefe2f16f15c112944db848f9a72e6e6e83ce6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71fefe2) will **decrease** coverage by `33.05%`.
   > The diff coverage is `84.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.07%   38.02%   -33.06%     
   + Complexity     4112       80     -4032     
   =============================================
     Files          1593     1593               
     Lines         82372    82378        +6     
     Branches      12270    12275        +5     
   =============================================
   - Hits          58545    31322    -27223     
   - Misses        19872    48664    +28792     
   + Partials       3955     2392     -1563     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.01% <84.00%> (-0.02%)` | :arrow_down: |
   | integration2 | `27.55% <60.00%> (-0.04%)` | :arrow_down: |
   | unittests1 | `?` | |
   | unittests2 | `14.33% <56.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `85.00% <84.61%> (-1.67%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `60.29% <100.00%> (-12.06%)` | :arrow_down: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/table/FSTType.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0ZTVFR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/data/MetricFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9NZXRyaWNGaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/BigDecimalUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQmlnRGVjaW1hbFV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [892 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [71fefe2...3e3184b](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (29e5d20) into [master](https://codecov.io/gh/apache/pinot/commit/58e7f10c7a0bb8e9137377e913797d54f9b89f8a?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (58e7f10) will **decrease** coverage by `0.82%`.
   > The diff coverage is `84.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.08%   70.25%   -0.83%     
   - Complexity     4109     4116       +7     
   ============================================
     Files          1593     1593              
     Lines         82373    82379       +6     
     Branches      12269    12274       +5     
   ============================================
   - Hits          58555    57876     -679     
   - Misses        19867    20556     +689     
   + Partials       3951     3947       -4     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `29.01% <84.00%> (+0.01%)` | :arrow_up: |
   | integration2 | `?` | |
   | unittests1 | `68.11% <0.00%> (+0.05%)` | :arrow_up: |
   | unittests2 | `14.33% <56.00%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `85.00% <84.61%> (-1.67%)` | :arrow_down: |
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `71.19% <100.00%> (-0.62%)` | :arrow_down: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [...t/core/plan/StreamingInstanceResponsePlanNode.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9wbGFuL1N0cmVhbWluZ0luc3RhbmNlUmVzcG9uc2VQbGFuTm9kZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ore/operator/streaming/StreamingResponseUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9vcGVyYXRvci9zdHJlYW1pbmcvU3RyZWFtaW5nUmVzcG9uc2VVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ager/realtime/PeerSchemeSplitSegmentCommitter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUGVlclNjaGVtZVNwbGl0U2VnbWVudENvbW1pdHRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...pache/pinot/common/utils/grpc/GrpcQueryClient.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvZ3JwYy9HcnBjUXVlcnlDbGllbnQuamF2YQ==) | `0.00% <0.00%> (-94.74%)` | :arrow_down: |
   | ... and [98 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [58e7f10...29e5d20](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3e3184b) into [master](https://codecov.io/gh/apache/pinot/commit/71fefe2f16f15c112944db848f9a72e6e6e83ce6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71fefe2) will **decrease** coverage by `34.91%`.
   > The diff coverage is `60.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.07%   36.15%   -34.92%     
   + Complexity     4112       80     -4032     
   =============================================
     Files          1593     1593               
     Lines         82372    82378        +6     
     Branches      12270    12275        +5     
   =============================================
   - Hits          58545    29785    -28760     
   - Misses        19872    50237    +30365     
   + Partials       3955     2356     -1599     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `27.55% <60.00%> (-0.04%)` | :arrow_down: |
   | unittests1 | `?` | |
   | unittests2 | `14.33% <56.00%> (-0.03%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `58.27% <0.00%> (-14.08%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `84.61% <83.33%> (-7.06%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/config/table/FSTType.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvY29uZmlnL3RhYmxlL0ZTVFR5cGUuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ava/org/apache/pinot/spi/data/MetricFieldSpec.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvZGF0YS9NZXRyaWNGaWVsZFNwZWMuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/BigDecimalUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQmlnRGVjaW1hbFV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [964 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [71fefe2...3e3184b](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772619331



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       I can revert this change, but I want to keep some of the changes which remove warnings because this class has so many warnings it is very difficult to read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772832180



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
 
+  /**
+   * @return last offset in the batch
+   */
+  default StreamPartitionMsgOffset getLastOffset() {
+    return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);

Review comment:
       Indeed, `getMessageCount() - 1` is not an offset, but an index into the batch of messages - it returns the next offset of the last message. `getNextStreamParitionMsgOffsetAtIndex` takes an _index_ and produces the next offset from the message at that index. Does this make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771821781



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -423,12 +427,17 @@ protected boolean consumeLoop()
         consecutiveIdleCount = 0;
         // We consumed something. Update the highest stream offset as well as partition-consuming metric.
         // TODO Issue 5359 Need to find a way to bump metrics without getting actual offset value.
-//        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
-//        _currentOffset.getOffset());
-//        _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
-//        _currentOffset.getOffset());
+        //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_KAFKA_OFFSET_CONSUMED,
+        //_currentOffset.getOffset());
+        //_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+        //_currentOffset.getOffset());
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
+      } else if (messageBatch.getUnfilteredMessageCount() > 0) {
+        // we consumed something from the stream but filtered all the content out,
+        // so we need to advance the offsets to avoid getting stuck
+        _currentOffset = messageBatch.getLastOffset();
+        lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);

Review comment:
       This is the bug fix, this ensures that we advance after consuming a bad batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771822134



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
 
+  /**
+   * @return last offset in the batch
+   */
+  default StreamPartitionMsgOffset getLastOffset() {
+    return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);

Review comment:
       Overridden for Kafka 2.0 only. I don't want to expand the scope to other implementations if it slows down fixing the Kafka bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (104a3bf) into [master](https://codecov.io/gh/apache/pinot/commit/71fefe2f16f15c112944db848f9a72e6e6e83ce6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71fefe2) will **decrease** coverage by `56.73%`.
   > The diff coverage is `56.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.07%   14.33%   -56.74%     
   + Complexity     4112       80     -4032     
   =============================================
     Files          1593     1548       -45     
     Lines         82372    80508     -1864     
     Branches      12270    12072      -198     
   =============================================
   - Hits          58545    11542    -47003     
   - Misses        19872    68109    +48237     
   + Partials       3955      857     -3098     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.33% <56.00%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-72.35%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1268 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [71fefe2...104a3bf](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772617262



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -294,15 +300,17 @@ public void deleteSegmentFile() {
   private boolean endCriteriaReached() {
     Preconditions.checkState(_state.shouldConsume(), "Incorrect state %s", _state);
     long now = now();
+    long consumeEndTime = _consumeEndTime;
     switch (_state) {
       case INITIAL_CONSUMING:
         // The segment has been created, and we have not posted a segmentConsumed() message on the controller yet.
         // We need to consume as much data as available, until we have either reached the max number of rows or
         // the max time we are allowed to consume.
-        if (now >= _consumeEndTime) {
+        if (now >= consumeEndTime) {
           if (_realtimeSegment.getNumDocsIndexed() == 0) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
-            _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
+            CONSUME_END_TIME_UPDATER.compareAndSet(this, consumeEndTime, consumeEndTime
+                + TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS));

Review comment:
       I am +1 on Jackie's comment. Unless we see a need, can we keep it as it is? 
   The only time the object is accessed in multiple threads is during completion when an ONLINE state transition is received. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (e0a9a2a) into [master](https://codecov.io/gh/apache/pinot/commit/71fefe2f16f15c112944db848f9a72e6e6e83ce6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71fefe2) will **decrease** coverage by `56.73%`.
   > The diff coverage is `56.00%`.
   
   > :exclamation: Current head e0a9a2a differs from pull request most recent head 5214304. Consider uploading reports for the commit 5214304 to get more accurate results
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #7927       +/-   ##
   =============================================
   - Coverage     71.07%   14.34%   -56.74%     
   + Complexity     4112       80     -4032     
   =============================================
     Files          1593     1548       -45     
     Lines         82372    80507     -1865     
     Branches      12270    12072      -198     
   =============================================
   - Hits          58545    11546    -46999     
   - Misses        19872    68105    +48233     
   + Partials       3955      856     -3099     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.34% <56.00%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-72.35%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `0.00% <ø> (-92.31%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...ain/java/org/apache/pinot/core/data/table/Key.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL0tleS5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/spi/utils/BooleanUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQm9vbGVhblV0aWxzLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/data/table/Record.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL3RhYmxlL1JlY29yZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../java/org/apache/pinot/core/util/GroupByUtils.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS91dGlsL0dyb3VwQnlVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1268 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [71fefe2...5214304](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-997206576


   # [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#7927](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (104a3bf) into [master](https://codecov.io/gh/apache/pinot/commit/71fefe2f16f15c112944db848f9a72e6e6e83ce6?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (71fefe2) will **decrease** coverage by `6.33%`.
   > The diff coverage is `56.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pinot/pull/7927/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #7927      +/-   ##
   ============================================
   - Coverage     71.07%   64.74%   -6.34%     
   + Complexity     4112     4111       -1     
   ============================================
     Files          1593     1548      -45     
     Lines         82372    80508    -1864     
     Branches      12270    12072     -198     
   ============================================
   - Hits          58545    52123    -6422     
   - Misses        19872    24671    +4799     
   + Partials       3955     3714     -241     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `68.05% <0.00%> (-0.02%)` | :arrow_down: |
   | unittests2 | `14.33% <56.00%> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `46.56% <0.00%> (-25.79%)` | :arrow_down: |
   | [...in/stream/kinesis/KinesisPartitionGroupOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWtpbmVzaXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3Bpbm90L3BsdWdpbi9zdHJlYW0va2luZXNpcy9LaW5lc2lzUGFydGl0aW9uR3JvdXBPZmZzZXQuamF2YQ==) | `17.39% <ø> (ø)` | |
   | [...ava/org/apache/pinot/spi/stream/LongMsgOffset.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL0xvbmdNc2dPZmZzZXQuamF2YQ==) | `92.30% <ø> (ø)` | |
   | [...java/org/apache/pinot/spi/stream/MessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL01lc3NhZ2VCYXRjaC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...pinot/plugin/stream/kafka20/KafkaMessageBatch.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthTWVzc2FnZUJhdGNoLmphdmE=) | `53.84% <66.66%> (-37.83%)` | :arrow_down: |
   | [...in/stream/kafka20/KafkaPartitionLevelConsumer.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25zdW1lci5qYXZh) | `80.00% <76.92%> (-6.67%)` | :arrow_down: |
   | [...a/org/apache/pinot/common/metrics/MinionMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25NZXRlci5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...g/apache/pinot/common/metrics/ControllerMeter.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyTWV0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/BrokerQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Ccm9rZXJRdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [.../apache/pinot/common/metrics/MinionQueryPhase.java](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9NaW5pb25RdWVyeVBoYXNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [372 more](https://codecov.io/gh/apache/pinot/pull/7927/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [71fefe2...104a3bf](https://codecov.io/gh/apache/pinot/pull/7927?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-998441097


   On a separate note, I saw that originally there were some optimizations piggy-backed to this PR. We should avoid doing that. Each PR should only focus on one feature, one bug fix, etc. Any optimization or refactoring should go in a separate PR. That might take a little longer for the author, but it benefits all of us in the long run. 
   One example of this issue happened last week when we spent a long time finding the root cause for the chunk index writer bug. The problematic optimization was added in a PR with title `Add MV raw forward index and MV BYTES data type`. The table having performance problem didn't have any multi-value columns or byte data type. So we couldn't easily tell if this commit is related or not. And there were a lot of commits we needed to examine. If we had a separate PR for that optimization, we could've found the root cause much easier!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772989592



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);

Review comment:
       I don’t think that’s an accurate reading of the code. lastOffset takes the value of the last message *in the requested offset range* even if it’s a tombstone, so in general it is not equal to start + 1 unless all messages were outside of the polled offset range. It’s not clear why the offset range filtering exists, as in, I don’t think the offset range check can fail. 
   
   @xiangfu0 can you comment why the offset range check exists since I believe you added it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r771823033



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -543,14 +550,14 @@ private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeS
 
       _currentOffset = messagesAndOffsets.getNextStreamParitionMsgOffsetAtIndex(index);
       _numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
-      _numRowsConsumed++;
       streamMessageCount++;
+      NUM_ROWS_CONSUMED_UPDATER.incrementAndGet(this);

Review comment:
       Concurrency bug: non atomic operation on a volatile variable with undefined results. I believe this may have resulted in the wrong number of rows in segment metadata but I haven't tracked where this value goes further than `SegmentCompletionProtocol`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin edited a comment on pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin edited a comment on pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#issuecomment-998475535


   > On a separate note, I saw that originally there were some optimizations piggy-backed to this PR. We should avoid doing that. Each PR should only focus on one feature, one bug fix, etc. Any optimization or refactoring should go in a separate PR. That might take a little longer for the author, but it benefits all of us in the long run.
   
   There were no optimisations in this PR, there were some fixes for concurrency bugs (e.g. it's incorrect to use an increment operator on a volatile variable) so I'm not sure what you're referring to. 
   
   > One example of this issue happened last week when we spent a long time finding the root cause for the chunk index writer bug. The problematic optimization was added in a PR with title Add MV raw forward index and MV BYTES data type. 
   
   I'm not sure this is the best place to be discussing this but you are implying that the change you mentioned was unrelated to the PR it was made in, but it wasn't. The purpose of the change, as has been discussed, is that the particular class creates very large buffers. It was included in that PR because its use for MV BYTES columns exacerbates this by multiplying what is already an overestimate of the buffer size by the maximum number of elements in the column. So the change was a mitigation to the worst case made worse by that PR.
   
   > The table having performance problem didn't have any multi-value columns or byte data type. So we couldn't easily tell if this commit is related or not. And there were a lot of commits we needed to examine. If we had a separate PR for that optimization, we could've found the root cause much easier!
   
   Looking at commits to figure out what changed isn't an efficient diagnostic technique. Had you instead used a profiler you would have found the process was spending a large amount of time in the syscalls `mmap` and `munmap`; looking at the git blame for each Pinot stack frame above those syscalls would have found the culprit in O(stack depth) time rather than O(lines of code changed). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772771553



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
 
+  /**
+   * @return last offset in the batch
+   */
+  default StreamPartitionMsgOffset getLastOffset() {
+    return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);

Review comment:
       `getMessageCount() - 1` is not a valid offset. Since this method is overridden for kafka 2.0 and the other streams are not supposed to use this method, at least for now, I believe we should throw IllegalStateException here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] richardstartin commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

Posted by GitBox <gi...@apache.org>.
richardstartin commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772989592



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);

Review comment:
       I don’t think that’s an accurate reading of the code. lastOffset takes the value of the last message *in the requested offset range* even if it’s a tombstone, so in general it is not equal to start + 1 unless all messages were outside of the polled offset range. It’s not clear why the offset range filtering exists, as in, I don’t think the offset range check can fail. 
   
   @xiangfu0 can you comment why the offset range check exists since I believe you added it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org