You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by kh...@apache.org on 2021/07/02 05:15:40 UTC

[incubator-pinot] branch master updated: Cancel running Kinesis consumer tasks when timeout occurs (#7109)

This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d4d0b93  Cancel running Kinesis consumer tasks when timeout occurs (#7109)
d4d0b93 is described below

commit d4d0b93f246aa32f2e1a93d5588e148ed4f3cf75
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Fri Jul 2 10:45:28 2021 +0530

    Cancel running Kinesis consumer tasks when timeout occurs (#7109)
    
    * Cancel tasks when timeout occurs
    
    * Formatting changes and change the decoder to new class
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 26 ++++++++++++++++++++++
 .../plugin/stream/kinesis/KinesisConsumerTest.java |  5 ++---
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 1feac67..78719e8 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -28,10 +28,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.core.exception.AbortedException;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@@ -85,6 +87,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
 
     try {
       return kinesisFetchResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException e) {
+      kinesisFetchResultFuture.cancel(true);
+      return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList);
     } catch (Exception e) {
       return handleException((KinesisPartitionGroupOffset) startCheckpoint, recordList);
     }
@@ -140,11 +145,16 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
 
         if (getRecordsResponse.hasChildShards() && !getRecordsResponse.childShards().isEmpty()) {
           //This statement returns true only when end of current shard has reached.
+          // hasChildShards only checks if the childShard is null and is a valid instance.
           isEndOfShard = true;
           break;
         }
 
         shardIterator = getRecordsResponse.nextShardIterator();
+
+        if (Thread.interrupted()) {
+          break;
+        }
       }
 
       return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
@@ -164,6 +174,9 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     } catch (KinesisException e) {
       LOGGER.warn("Encountered unknown unrecoverable AWS exception", e);
       throw new RuntimeException(e);
+    } catch (AbortedException e) {
+      LOGGER.warn("Task aborted due to exception.", e);
+      return handleException(kinesisStartCheckpoint, recordList);
     } catch (Throwable e) {
       // non transient errors
       LOGGER.error("Unknown fetchRecords exception", e);
@@ -198,5 +211,18 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
   @Override
   public void close() {
     super.close();
+    shutdownAndAwaitTermination();
+  }
+
+  void shutdownAndAwaitTermination() {
+    _executorService.shutdown();
+    try {
+      if (!_executorService.awaitTermination(60, TimeUnit.SECONDS)) {
+        _executorService.shutdownNow();
+      }
+    } catch (InterruptedException ie) {
+      _executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index aacc551..ba82502 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -65,8 +65,7 @@ public class KinesisConsumerTest {
   private KinesisConfig getKinesisConfig() {
     Map<String, String> props = new HashMap<>();
     props.put(StreamConfigProperties.STREAM_TYPE, STREAM_TYPE);
-    props.put(StreamConfigProperties
-            .constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
+    props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_TOPIC_NAME),
         STREAM_NAME);
     props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_TYPES),
         StreamConfig.ConsumerType.LOWLEVEL.toString());
@@ -74,7 +73,7 @@ public class KinesisConsumerTest {
             .constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
         KinesisConsumerFactory.class.getName());
     props.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, StreamConfigProperties.STREAM_DECODER_CLASS),
-        "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+        "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
     props.put(KinesisConfig.REGION, AWS_REGION);
     props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, String.valueOf(MAX_RECORDS_TO_FETCH));
     props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org