You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/06/07 19:11:54 UTC
[pinot] branch master updated: Cleanup Kinesis Logs (#8787)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 041b62f079 Cleanup Kinesis Logs (#8787)
041b62f079 is described below
commit 041b62f079eb5d2eaef380abcef2961e176fee8d
Author: Kartik Khare <kh...@gmail.com>
AuthorDate: Wed Jun 8 00:41:49 2022 +0530
Cleanup Kinesis Logs (#8787)
---
.../pinot/plugin/stream/kinesis/KinesisConsumer.java | 18 +++++++++++++-----
1 file changed, 13 insertions(+), 5 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 7f2557e93a..ccc9b7ee66 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -159,23 +159,23 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
return new KinesisRecordsBatch(recordList, startShardToSequenceNum.getKey(), isEndOfShard);
} catch (IllegalStateException e) {
- LOGGER.warn("Illegal state exception, connection is broken", e);
+ debugOrLogWarning("Illegal state exception, connection is broken", e);
return handleException(kinesisStartCheckpoint, recordList);
} catch (ProvisionedThroughputExceededException e) {
- LOGGER.warn("The request rate for the stream is too high", e);
+ debugOrLogWarning("The request rate for the stream is too high", e);
return handleException(kinesisStartCheckpoint, recordList);
} catch (ExpiredIteratorException e) {
- LOGGER.warn("ShardIterator expired while trying to fetch records", e);
+ debugOrLogWarning("ShardIterator expired while trying to fetch records", e);
return handleException(kinesisStartCheckpoint, recordList);
} catch (ResourceNotFoundException | InvalidArgumentException e) {
// aws errors
LOGGER.error("Encountered AWS error while attempting to fetch records", e);
return handleException(kinesisStartCheckpoint, recordList);
} catch (KinesisException e) {
- LOGGER.warn("Encountered unknown unrecoverable AWS exception", e);
+ debugOrLogWarning("Encountered unknown unrecoverable AWS exception", e);
throw new RuntimeException(e);
} catch (AbortedException e) {
- LOGGER.warn("Task aborted due to exception.", e);
+ debugOrLogWarning("Task aborted due to exception", e);
return handleException(kinesisStartCheckpoint, recordList);
} catch (Throwable e) {
// non transient errors
@@ -184,6 +184,14 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Partiti
}
}
+ private void debugOrLogWarning(String message, Throwable throwable) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(message, throwable);
+ } else {
+ LOGGER.warn(message + ": " + throwable.getMessage());
+ }
+ }
+
private KinesisRecordsBatch handleException(KinesisPartitionGroupOffset start, List<Record> recordList) {
String shardId = start.getShardToStartSequenceMap().entrySet().iterator().next().getKey();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org