You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:18 UTC

[incubator-pinot] 12/23: fetch records with timeout

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 900450da05347fd6ce958053141b701c7e09d0e9
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:44:38 2020 +0530

    fetch records with timeout
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 30 ++++++++++++++++++----
 1 file changed, 25 insertions(+), 5 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 96241d4..910b9ee 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -21,6 +21,12 @@ package org.apache.pinot.plugin.stream.kinesis;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
@@ -39,6 +45,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
   String _stream;
   Integer _maxRecords;
   String _shardId;
+  ExecutorService _executorService;
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -46,10 +53,27 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     _maxRecords = kinesisConfig.maxRecordsToFetch();
     KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
     _shardId = kinesisShardMetadata.getShardId();
+    _executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
+      @Override
+      public KinesisFetchResult call()
+          throws Exception {
+        return getResult(start, end);
+      }
+    });
+
+    try {
+      return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+    } catch(Exception e){
+      return null;
+    }
+  }
+
+  private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
     try {
       KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
 
@@ -65,9 +89,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
       }
 
       String nextStartSequenceNumber = null;
-      Long startTimestamp = System.currentTimeMillis();
 
-      while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+      while (shardIterator != null) {
         GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
         GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
 
@@ -119,7 +142,4 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
     return getShardIteratorResponse.shardIterator();
   }
 
-  private boolean isTimedOut(Long startTimestamp, Long timeout) {
-    return (System.currentTimeMillis() - startTimestamp) >= timeout;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org