You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/03 11:53:25 UTC
kafka git commit: KAFKA-3324;
NullPointerException in StreamPartitionAssignor
Repository: kafka
Updated Branches:
refs/heads/trunk b9eda22d7 -> 10394aa80
KAFKA-3324; NullPointerException in StreamPartitionAssignor
Author: Michael G. Noll <mi...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1001 from miguno/KAFKA-3324
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10394aa8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10394aa8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10394aa8
Branch: refs/heads/trunk
Commit: 10394aa80153db27e7672a949c91b436724b0ead
Parents: b9eda22
Author: Michael G. Noll <mi...@confluent.io>
Authored: Thu Mar 3 02:53:12 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 3 02:53:12 2016 -0800
----------------------------------------------------------------------
.../internals/StreamPartitionAssignor.java | 30 +++++++++++---------
1 file changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/10394aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 55cbb0e..7d89573 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -229,24 +229,26 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
if (internalTopicManager != null) {
log.debug("Starting to validate internal source topics in partition assignor.");
- for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
- String topic = streamThread.jobId + "-" + entry.getKey();
+ if (internalSourceTopicToTaskIds != null) {
+ for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
+ String topic = streamThread.jobId + "-" + entry.getKey();
- // should have size 1 only
- int numPartitions = -1;
- for (TaskId task : entry.getValue()) {
- numPartitions = task.partition;
- }
+ // should have size 1 only
+ int numPartitions = -1;
+ for (TaskId task : entry.getValue()) {
+ numPartitions = task.partition;
+ }
- internalTopicManager.makeReady(topic, numPartitions);
+ internalTopicManager.makeReady(topic, numPartitions);
- // wait until the topic metadata has been propagated to all brokers
- List<PartitionInfo> partitions;
- do {
- partitions = streamThread.restoreConsumer.partitionsFor(topic);
- } while (partitions == null || partitions.size() != numPartitions);
+ // wait until the topic metadata has been propagated to all brokers
+ List<PartitionInfo> partitions;
+ do {
+ partitions = streamThread.restoreConsumer.partitionsFor(topic);
+ } while (partitions == null || partitions.size() != numPartitions);
- metadata.update(topic, partitions);
+ metadata.update(topic, partitions);
+ }
}
log.info("Completed validating internal source topics in partition assignor.");