You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2021/01/03 01:22:18 UTC
[incubator-pinot] 12/23: fetch records with timeout
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 900450da05347fd6ce958053141b701c7e09d0e9
Author: KKcorps <kh...@gmail.com>
AuthorDate: Sun Dec 20 11:44:38 2020 +0530
fetch records with timeout
---
.../plugin/stream/kinesis/KinesisConsumer.java | 30 ++++++++++++++++++----
1 file changed, 25 insertions(+), 5 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 96241d4..910b9ee 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -21,6 +21,12 @@ package org.apache.pinot.plugin.stream.kinesis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.v2.Checkpoint;
import org.apache.pinot.spi.stream.v2.ConsumerV2;
@@ -39,6 +45,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
String _stream;
Integer _maxRecords;
String _shardId;
+ ExecutorService _executorService;
public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) {
super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -46,10 +53,27 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
_maxRecords = kinesisConfig.maxRecordsToFetch();
KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata;
_shardId = kinesisShardMetadata.getShardId();
+ _executorService = Executors.newSingleThreadExecutor();
}
@Override
public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
+ Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(new Callable<KinesisFetchResult>() {
+ @Override
+ public KinesisFetchResult call()
+ throws Exception {
+ return getResult(start, end);
+ }
+ });
+
+ try {
+ return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
+ } catch(Exception e){
+ return null;
+ }
+ }
+
+ private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) {
try {
KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start;
@@ -65,9 +89,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
}
String nextStartSequenceNumber = null;
- Long startTimestamp = System.currentTimeMillis();
- while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) {
+ while (shardIterator != null) {
GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build();
GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest);
@@ -119,7 +142,4 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume
return getShardIteratorResponse.shardIterator();
}
- private boolean isTimedOut(Long startTimestamp, Long timeout) {
- return (System.currentTimeMillis() - startTimestamp) >= timeout;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org