You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/07 19:11:54 UTC

[pinot] branch master updated: Cleanup Kinesis Logs (#8787)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 041b62f079 Cleanup Kinesis Logs (#8787)
041b62f079 is described below

commit 041b62f079eb5d2eaef380abcef2961e176fee8d
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jun 8 00:41:49 2022 +0530

    Cleanup Kinesis Logs (#8787)
---
 .../pinot/plugin/stream/kinesis/KinesisConsumer.java   | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7f2557e93a..ccc9b7ee66 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -159,23 +159,23 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
 
       return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
     } catch (IllegalStateException e) {
-      LOGGER.warn("Illegal state exception, connection is broken", e);
+      debugOrLogWarning("Illegal state exception, connection is broken", e);
       return handleException(kinesisStartCheckpoint, recordList);
     } catch (ProvisionedThroughputExceededException e) {
-      LOGGER.warn("The request rate for the stream is too high", e);
+      debugOrLogWarning("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
     } catch (ExpiredIteratorException e) {
-      LOGGER.warn("ShardIterator expired while trying to fetch records", e);
+      debugOrLogWarning("ShardIterator expired while trying to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
     } catch (ResourceNotFoundException | InvalidArgumentException e) {
       // aws errors
       LOGGER.error("Encountered AWS error while attempting to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
     } catch (KinesisException e) {
-      LOGGER.warn("Encountered unknown unrecoverable AWS exception", e);
+      debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
       throw new RuntimeException(e);
     } catch (AbortedException e) {
-      LOGGER.warn("Task aborted due to exception.", e);
+      debugOrLogWarning("Task aborted due to exception", e);
       return handleException(kinesisStartCheckpoint, recordList);
     } catch (Throwable e) {
       // non transient errors
@@ -184,6 +184,14 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
     }
   }
 
+  private void debugOrLogWarning(String message, Throwable throwable) {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(message, throwable);
+    } else {
+      LOGGER.warn(message + ": " + throwable.getMessage());
+    }
+  }
+
   private KinesisRecordsBatch handleException(KinesisPartitionGroupOffset start, List<Record> recordList) {
     String shardId = start.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org