You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/06 16:36:35 UTC
[flink] 01/02: [FLINK-12368] [Kafka] Add subtask index to
FlinkKafkaConsumerBase logging
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 33ec4f2f97cc726860859ba3503a35a6997f1854
Author: Steven Wu <st...@netflix.com>
AuthorDate: Wed Feb 27 22:19:02 2019 -0800
[FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
This closes #8315
---
.../connectors/kafka/FlinkKafkaConsumerBase.java | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6cf1839..d49e3fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -662,6 +662,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
// initialize commit metrics and default offset callback method
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+ final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
@@ -671,7 +672,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void onException(Throwable cause) {
- LOG.warn("Async Kafka commit failed.", cause);
+ LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
failedCommits.inc();
}
};
@@ -683,6 +684,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
sourceContext.markAsTemporarilyIdle();
}
+ LOG.info("Consumer subtask {} creating fetcher with offsets {}.",
+ getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
@@ -877,9 +880,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
- LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
+ LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
} else {
- LOG.info("No restore state for FlinkKafkaConsumer.");
+ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
}
}
@@ -943,13 +946,15 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// only one commit operation must be in progress
if (LOG.isDebugEnabled()) {
- LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+ LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+ getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
}
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
- LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+ LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
+ getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
return;
}
@@ -963,7 +968,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
if (offsets == null || offsets.size() == 0) {
- LOG.debug("Checkpoint state was empty.");
+ LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
return;
}