You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2021/07/02 05:15:40 UTC
[incubator-pinot] branch master updated: Cancel running Kinesis
consumer tasks when timeout occurs (#7109)
This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d4d0b93 Cancel running Kinesis consumer tasks when timeout occurs (#7109)
d4d0b93 is described below
commit d4d0b93f246aa32f2e1a93d5588e148ed4f3cf75
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Jul 2 10:45:28 2021 +0530
Cancel running Kinesis consumer tasks when timeout occurs (#7109)
* Cancel tasks when timeout occurs
* Formatting changes and change the decoder to new class
---
.../plugin/stream/kinesis/KinesisConsumer.java | 26 ++++++++++++++++++++++
.../plugin/stream/kinesis/KinesisConsumerTest.java | 5 ++---
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 1feac67..78719e8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -28,10 +28,12 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.AbortedException;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@@ -85,6 +87,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
try {
return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ kinesisFetchResultFuture.cancel(true);
+ return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList);
} catch (Exception e) {
return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList);
}
@@ -140,11 +145,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) {
//This statement returns true only when end of current shard has reached.
+ // hasChildShards only checks if the childShard is null and is a valid instance.
isEndOfShard = true;
break;
}
shardIterator = getRecordsResponse.nextShardIterator();
+
+ if (Thread.interrupted()) {
+ break;
+ }
}
return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
@@ -164,6 +174,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
} catch (KinesisException e) {
LOGGER.warn("Encountered unknown unrecoverable AWS exception", e);
throw new RuntimeException(e);
+ } catch (AbortedException e) {
+ LOGGER.warn("Task aborted due to exception.", e);
+ return handleException(kinesisStartCheckpoint, recordList);
} catch (Throwable e) {
// non transient errors
LOGGER.error("Unknown fetchRecords exception", e);
@@ -198,5 +211,18 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
@Override
public void close() {
super.close();
+ shutdownAndAwaitTermination();
+ }
+
+ void shutdownAndAwaitTermination() {
+ _executorService.shutdown();
+ try {
+ if (!_executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+ _executorService.shutdownNow();
+ }
+ } catch (InterruptedException ie) {
+ _executorService.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index aacc551..ba82502 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -65,8 +65,7 @@ public class KinesisConsumerTest {
private KinesisConfig getKinesisConfig() {
Map<String, String> props = new HashMap<>();
props.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
- props.put(StreamConfigProperties
- .constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
+ props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
STREAM_NAME);
props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
StreamConfig.ConsumerType.LOWLEVEL.toString());
@@ -74,7 +73,7 @@ public class KinesisConsumerTest {
.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
KinesisConsumerFactory.class.getName());
props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
- "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
props.put(KinesisConfig.REGION, AWS_REGION);
props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, String.valueOf(MAX_RECORDS_TO_FETCH));
props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org