You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/03/17 06:34:56 UTC

[GitHub] [druid] zhangyue19921010 opened a new issue #11005: Lag keep increasing when there is no more new data for kinesis.

zhangyue19921010 opened a new issue #11005:
URL: https://github.com/apache/druid/issues/11005


   <!--- Provide a general summary of the issue in the Title above -->
   
   ## Describe the bug
   <!--- A clear and concise description of what the bug is -->
   Lags for kinesis keep increasing when there is no more new data for kinesis.
   
   ## Expected Behavior
   Lags is zero if there is no more new data.
   
   ## Current Behavior
   keep increasing
   <!--- Include full errors, uncaught exceptions, stack traces, and relevant logs -->
   <!--- To turn on SDK logging, follow instructions here: http://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-logging.html -->
   <!--- If service responses are relevant, please include wirelogs -->
   
   ## Steps to Reproduce
   1. produce data to kinesis
   2. supervisor kinesis.json to overlord.
   3. stop kinesis producer
   5. Lag keep increasing even though there is no more new data for kinesis.
   
   I think this may be caused by misunderstanding of API `recordsResult.getMillisBehindLatest()`
   
   Here is document about this api 
   ```
   MillisBehindLatest
   The number of milliseconds the GetRecords response is from the tip of the stream, indicating how far behind current time the consumer is. A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.
   Type: Long
   Valid Range: Minimum value of 0.
   ```
   I think it means `MillisBehindLatest = consumeTime - dataTime`
    
   Here is my test Code
   ```
   
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   import com.amazonaws.regions.Regions;
   import com.amazonaws.services.kinesis.AmazonKinesis;
   import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
   import com.amazonaws.services.kinesis.model.GetRecordsRequest;
   import com.amazonaws.services.kinesis.model.GetRecordsResult;
   import com.amazonaws.services.kinesis.model.ShardIteratorType;
   
   public class KinesisGetTest {
   
       public static void main(String[] args) {
           AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard()
                   .withCredentials(DefaultAWSCredentialsProviderChain.getInstance())
                   .withRegion(Regions.US_EAST_1)
                   .build();
           String streamName ="dummyStream";
   
           getStockTrades(kinesis,streamName);
           getLatest(kinesis,streamName);
       }
   
       public static void getStockTrades(AmazonKinesis kinesis, String streamName) {
           String shardIterator = kinesis.getShardIterator(
                   streamName,
                   "shardId-000000000001",
                   ShardIteratorType.AT_SEQUENCE_NUMBER.toString(),
                   "49609775258711157847444017307218867996992776863817400338"
           ).getShardIterator();
           GetRecordsResult recordsResult = kinesis.getRecords(
                   new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
           );
           System.out.println("Current : " + recordsResult);
           System.out.println("Current : " + recordsResult.getRecords().size());
           System.out.println(recordsResult.getMillisBehindLatest());
       }
   
       public static void getLatest(AmazonKinesis kinesis, String streamName) {
           String shardIterator = kinesis.getShardIterator(
                   streamName,
                   "shardId-000000000001",
                   ShardIteratorType.LATEST.toString()
           ).getShardIterator();
           GetRecordsResult recordsResult = kinesis.getRecords(
                   new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1)
           );
           System.out.println("Latest: " +  recordsResult);
           System.out.println("Latest: " +  recordsResult.getRecords().size());
           System.out.println(recordsResult.getMillisBehindLatest());
       }
   }
   ```
   
   Here are the outputs twice.
   ```
   Current : {Records: [{SequenceNumber: 49609775258711157847444017307218867996992776863817400338,ApproximateArrivalTimestamp: Fri Mar 12 10:33:54 CST 2021,Data: xxx,PartitionKey: xxxx#7579871697745#WZqoyxnp,}],NextShardIterator: AAAAAAAAAAEfEfaE4kbzGrymjJ6+0tfNi0xS791lzLKKtdMuRjZeTELzflQ+aIvS4bBscPf4hH4FiyHOBu1O61YfBlu/BkFM07us9WfHqro7dwvJU5wkMIQ/gV4msye/dBia2Rp//fRqgLpIuYhkKxCoR26t212sMJOgoObxSWfw+bHXpaBnxu5mV6O6buYrEyEGjeyI4cPZuSUOTZZ/8/kazJcVhzVF5J+q153ScagBJNK34PalhNQfsVQou+H5esxieJqz61fil7Q7+cXUlTDCM7x77XkI,MillisBehindLatest: 12185000}
   Current : 1
   12185000
   Latest: {Records: [],NextShardIterator: AAAAAAAAAAHmHyT2SkiJsKfdoi4FTop/Iuap5WVN3ihGGTivPOQUzS84Q6HOHZvGt0Yov136H+7o3UVkG2uzAY9sLdApsV5Q26Ahi+fNEN1DKihCZ90rSoOAbjXLykx1bAkrNjmJCzFU5XYVFKh9SHmn//UBWSl55Q7hbcdvPSLbfQRaQBumsDgXZY5F2xF5QZ6YCDoqUt01liOxYTRCFC33V6XXjmZWU69NTaUJ7Ku92BQGbeWVvOIP6Ae6xHGYNA/01c34dXJchY8EUxozdYUPbDw+Hvfb,MillisBehindLatest: 0}
   Latest: 0
   0
   ```
   
   AND!
   ```
   Current : {Records: [{SequenceNumber: 49609775258711157847444017307218867996992776863817400338,ApproximateArrivalTimestamp: Fri Mar 12 10:33:54 CST 2021,Data: xxx,PartitionKey: xxx#7579871697745#WZqoyxnp,}],NextShardIterator: AAAAAAAAAAG8yLLTlrY/zb2pHM7XMppR4M55aXnaJbppstPqGwgxJIeT1hDJAhCm5y1Zq9cMe+1ZMjPu5Cygixs+Ek9RcAkW4oRoE3ja2AGXSRi4pNNbLrDYQoOWOahNvZmKm3B1NzMYu6vwJgiosp6BUQKeby6bzZxGYD1qrLbXLVPD+SvzVsRTxWFBXDXhTjdYwu8iflRNJigZUYkXi1g4YlPJcCly8yRWSMiY9irpK6mDXPc4MzJusdWDJshueRG7lLZjHcRQb66H+lNcWH62onvPd6h+,MillisBehindLatest: 12252000}
   Current : 1
   12252000
   Latest: {Records: [],NextShardIterator: AAAAAAAAAAGFg2h3aQIOXX3rOHct2oQMILcvyqDchVTR3P2JH9kUi5keWhjwavVLzz80Biz13k/b56spezqmx9ZzYZ323FqMZUvCO/LV6P8FAi1eQC8KzYJZS0CPdEn0mZNyZ2jxevtvLRwXaMpUgbBvIG+G1U0w8FctuYogze/O6mN9cEg9eAV4lsGy40jE7i8k4T1S7k2Qzw5YA/gtAwgJmmMMUypJdyJ7BoYQ+fhG8KfVvJYrvAaf4bXBzbtOmrYy0HSKlndwTxJM5aisN/ZQh7UvcDpf,MillisBehindLatest: 0}
   Latest: 0
   0
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] suneet-s closed issue #11005: Lag keep increasing when there is no more new data for kinesis.

Posted by GitBox <gi...@apache.org>.
suneet-s closed issue #11005:
URL: https://github.com/apache/druid/issues/11005


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org