You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/06 16:36:35 UTC

[flink] 01/02: [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 33ec4f2f97cc726860859ba3503a35a6997f1854
Author: Steven Wu <st...@netflix.com>
AuthorDate: Wed Feb 27 22:19:02 2019 -0800

    [FLINK-12368] [Kafka] Add subtask index to FlinkKafkaConsumerBase logging
    
    This closes #8315
---
 .../connectors/kafka/FlinkKafkaConsumerBase.java        | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 6cf1839..d49e3fb 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -662,6 +662,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		// initialize commit metrics and default offset callback method
 		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
 		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
+		final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
 
 		this.offsetCommitCallback = new KafkaCommitCallback() {
 			@Override
@@ -671,7 +672,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 			@Override
 			public void onException(Throwable cause) {
-				LOG.warn("Async Kafka commit failed.", cause);
+				LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);
 				failedCommits.inc();
 			}
 		};
@@ -683,6 +684,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			sourceContext.markAsTemporarilyIdle();
 		}
 
+		LOG.info("Consumer subtask {} creating fetcher with offsets {}.",
+			getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets);
 		// from this point forward:
 		//   - 'snapshotState' will draw offsets from the fetcher,
 		//     instead of being built from `subscribedPartitionsToStartOffsets`
@@ -877,9 +880,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
 			}
 
-			LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
+			LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
 		} else {
-			LOG.info("No restore state for FlinkKafkaConsumer.");
+			LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
 		}
 	}
 
@@ -943,13 +946,15 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 		if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
 			// only one commit operation must be in progress
 			if (LOG.isDebugEnabled()) {
-				LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
+				LOG.debug("Consumer subtask {} committing offsets to Kafka/ZooKeeper for checkpoint {}.",
+					getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
 			}
 
 			try {
 				final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
 				if (posInMap == -1) {
-					LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
+					LOG.warn("Consumer subtask {} received confirmation for unknown checkpoint id {}",
+						getRuntimeContext().getIndexOfThisSubtask(), checkpointId);
 					return;
 				}
 
@@ -963,7 +968,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				}
 
 				if (offsets == null || offsets.size() == 0) {
-					LOG.debug("Checkpoint state was empty.");
+					LOG.debug("Consumer subtask {} has empty checkpoint state.", getRuntimeContext().getIndexOfThisSubtask());
 					return;
 				}