You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/25 06:10:20 UTC

flink git commit: [hotfix] [kafka] Add log info for partitions subscribed by FlinkKafkaConsumer subtasks

Repository: flink
Updated Branches:
  refs/heads/release-1.2 c6a807250 -> 75db91e62


[hotfix] [kafka] Add log info for partitions subscribed by FlinkKafkaConsumer subtasks


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75db91e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75db91e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75db91e6

Branch: refs/heads/release-1.2
Commit: 75db91e62342db1be15020d41c3e23f15b2aca36
Parents: c6a8072
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Mar 25 14:03:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Mar 25 14:07:17 2017 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaConsumerBase.java           | 23 ++++++++++----------
 1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75db91e6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index bfc347f..bdeb478 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -324,10 +324,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 					restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
 				}
 
-				LOG.info("Setting restore state in the FlinkKafkaConsumer.");
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Using the following offsets: {}", restoreToOffset);
-				}
+				LOG.info("Setting restore state in the FlinkKafkaConsumer for consumer subtask {}: {}",
+					getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
 			} else if (restoreToOffset.isEmpty()) {
 				restoreToOffset = null;
 			}
@@ -387,15 +385,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 
 	@Override
 	public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
-		LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
-			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+		LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}",
+			getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
 
 		restoreToOffset = restoredOffsets;
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
-				getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
-		}
 	}
 
 	@Override
@@ -492,6 +485,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 			for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) {
 				subscribedPartitions.add(restoredPartitionState.getKey());
 			}
+
+			LOG.info("Consumer subtask {} will use the partitions in restored state as subscribed partitions: {}",
+				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions);
 		} else {
 			List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
 
@@ -508,11 +504,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
 				}
 			});
 
+			LOG.info("Fetched a total of {} kafka partitions: {}", kafkaTopicPartitions.size(), kafkaTopicPartitions);
+
 			subscribedPartitions = new ArrayList<>(
 				(kafkaTopicPartitions.size() / getRuntimeContext().getNumberOfParallelSubtasks()) + 1);
 			for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
 				subscribedPartitions.add(kafkaTopicPartitions.get(i));
 			}
+
+			LOG.info("Consumer subtask {} will subscribe to {} partitions: {}",
+				getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions.size(), subscribedPartitions);
 		}
 	}