You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/17 06:37:00 UTC

[GitHub] [druid] zhangyue19921010 opened a new pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

zhangyue19921010 opened a new pull request #11006:
URL: https://github.com/apache/druid/pull/11006


   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes https://github.com/apache/druid/issues/11005.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   check the value of `AFTER_SEQUENCE_NUMBER`. If empty it means there is no more new data for current stream. So just return 0L here.
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `KinesisRecordSupplier.java`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #11006:
URL: https://github.com/apache/druid/pull/11006#discussion_r595748239



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -863,21 +863,36 @@ private Long getPartitionTimeLag(StreamPartition<String> partition, String offse
         iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
         offsetToUse = offset;
       }
-      String shardIterator = kinesis.getShardIterator(
-          partition.getStream(),
-          partition.getPartitionId(),
-          iteratorType,
-          offsetToUse
-      ).getShardIterator();
 
-      GetRecordsResult recordsResult = kinesis.getRecords(
-          new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
-      );
+      GetRecordsResult recordsResult = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition);
+
+      // If no more new data after offsetToUse, it means there is no lag for now.
+      // So report lag points as 0L.
+      if (recordsResult.getRecords().size() == 0) {
+        return 0L;
+      } else {
+        recordsResult = getRecords(iteratorType, offsetToUse, partition);
+      }
 
       return recordsResult.getMillisBehindLatest();
     });
   }
 
+  private GetRecordsResult getRecords(String iteratorType, String offsetToUse, StreamPartition<String> partition)

Review comment:
       nit: might be nice to name this method `getRecordsForLag` or something similar to indicate its limited purpose (the `.withLimit(1)` makes it probably not really useful for actually getting records, nor does it need to be because other methods are handling that)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zhangyue19921010 commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
zhangyue19921010 commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-800957796


   Hi @clintropolis Thanks for your review and approval! Job 75 `(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests` is failed. But I think job 75 is not this pr related. Retry may be passed :) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zhangyue19921010 commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
zhangyue19921010 commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-800838266


   Hi @clintropolis could you please take a look at this issue at your convince? I find you introduce this useful metrics in https://github.com/apache/druid/pull/9509. Maybe you will be more familiar with it :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-801120972


   @zhangyue19921010 have you been able to run the kinesis integration tests locally to see if those tests continue to pass after this change? There are some instructions on how to run them here - https://github.com/apache/druid/blob/master/integration-tests/README.md#running-a-test-that-uses-cloud


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s merged pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
suneet-s merged pull request #11006:
URL: https://github.com/apache/druid/pull/11006


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zhangyue19921010 commented on a change in pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
zhangyue19921010 commented on a change in pull request #11006:
URL: https://github.com/apache/druid/pull/11006#discussion_r595782296



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -863,21 +863,36 @@ private Long getPartitionTimeLag(StreamPartition<String> partition, String offse
         iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString();
         offsetToUse = offset;
       }
-      String shardIterator = kinesis.getShardIterator(
-          partition.getStream(),
-          partition.getPartitionId(),
-          iteratorType,
-          offsetToUse
-      ).getShardIterator();
 
-      GetRecordsResult recordsResult = kinesis.getRecords(
-          new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
-      );
+      GetRecordsResult recordsResult = getRecords(ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), offsetToUse, partition);
+
+      // If no more new data after offsetToUse, it means there is no lag for now.
+      // So report lag points as 0L.
+      if (recordsResult.getRecords().size() == 0) {
+        return 0L;
+      } else {
+        recordsResult = getRecords(iteratorType, offsetToUse, partition);
+      }
 
       return recordsResult.getMillisBehindLatest();
     });
   }
 
+  private GetRecordsResult getRecords(String iteratorType, String offsetToUse, StreamPartition<String> partition)

Review comment:
       Changed. Thanks for your review.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zhangyue19921010 commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
zhangyue19921010 commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-803228766


   Thanks a lot :) @suneet-s and @clintropolis 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
suneet-s commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-802887153


   Thanks for the contribution @zhangyue19921010 !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] zhangyue19921010 commented on pull request #11006: [BUG FIX]Kinesis lag keep increasing when there is no more new data for kinesis stream

Posted by GitBox <gi...@apache.org>.
zhangyue19921010 commented on pull request #11006:
URL: https://github.com/apache/druid/pull/11006#issuecomment-801685093


   Hi @suneet-s Thanks for reminding. I just run the kinesis integration tests locally and succeeded.
   
   <img width="1082" alt="屏幕快照 2021-03-18 下午3 02 46" src="https://user-images.githubusercontent.com/69956021/111586092-4808a800-87fb-11eb-9648-7a1af38a209f.png">
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org