You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/03/25 06:10:20 UTC
flink git commit: [hotfix] [kafka] Add log info for partitions
subscribed by FlinkKafkaConsumer subtasks
Repository: flink
Updated Branches:
refs/heads/release-1.2 c6a807250 -> 75db91e62
[hotfix] [kafka] Add log info for partitions subscribed by FlinkKafkaConsumer subtasks
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75db91e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75db91e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75db91e6
Branch: refs/heads/release-1.2
Commit: 75db91e62342db1be15020d41c3e23f15b2aca36
Parents: c6a8072
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
Authored: Sat Mar 25 14:03:03 2017 +0800
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Sat Mar 25 14:07:17 2017 +0800
----------------------------------------------------------------------
.../kafka/FlinkKafkaConsumerBase.java | 23 ++++++++++----------
1 file changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/75db91e6/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index bfc347f..bdeb478 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -324,10 +324,8 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
restoreToOffset.put(kafkaOffset.f0, kafkaOffset.f1);
}
- LOG.info("Setting restore state in the FlinkKafkaConsumer.");
- if (LOG.isDebugEnabled()) {
- LOG.debug("Using the following offsets: {}", restoreToOffset);
- }
+ LOG.info("Setting restore state in the FlinkKafkaConsumer for consumer subtask {}: {}",
+ getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
} else if (restoreToOffset.isEmpty()) {
restoreToOffset = null;
}
@@ -387,15 +385,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
@Override
public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
- LOG.info("{} (taskIdx={}) restoring offsets from an older version.",
- getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask());
+ LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}",
+ getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoredOffsets);
restoreToOffset = restoredOffsets;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} (taskIdx={}) restored offsets from an older Flink version: {}",
- getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), restoreToOffset);
- }
}
@Override
@@ -492,6 +485,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
for (Map.Entry<KafkaTopicPartition, Long> restoredPartitionState : restoreToOffset.entrySet()) {
subscribedPartitions.add(restoredPartitionState.getKey());
}
+
+ LOG.info("Consumer subtask {} will use the partitions in restored state as subscribed partitions: {}",
+ getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions);
} else {
List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
@@ -508,11 +504,16 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
}
});
+ LOG.info("Fetched a total of {} kafka partitions: {}", kafkaTopicPartitions.size(), kafkaTopicPartitions);
+
subscribedPartitions = new ArrayList<>(
(kafkaTopicPartitions.size() / getRuntimeContext().getNumberOfParallelSubtasks()) + 1);
for (int i = getRuntimeContext().getIndexOfThisSubtask(); i < kafkaTopicPartitions.size(); i += getRuntimeContext().getNumberOfParallelSubtasks()) {
subscribedPartitions.add(kafkaTopicPartitions.get(i));
}
+
+ LOG.info("Consumer subtask {} will subscribe to {} partitions: {}",
+ getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitions.size(), subscribedPartitions);
}
}