You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/03 11:53:25 UTC

kafka git commit: KAFKA-3324; NullPointerException in StreamPartitionAssignor

Repository: kafka
Updated Branches:
  refs/heads/trunk b9eda22d7 -> 10394aa80


KAFKA-3324; NullPointerException in StreamPartitionAssignor

Author: Michael G. Noll <mi...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1001 from miguno/KAFKA-3324


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10394aa8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10394aa8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10394aa8

Branch: refs/heads/trunk
Commit: 10394aa80153db27e7672a949c91b436724b0ead
Parents: b9eda22
Author: Michael G. Noll <mi...@confluent.io>
Authored: Thu Mar 3 02:53:12 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 3 02:53:12 2016 -0800

----------------------------------------------------------------------
 .../internals/StreamPartitionAssignor.java      | 30 +++++++++++---------
 1 file changed, 16 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10394aa8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 55cbb0e..7d89573 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -229,24 +229,26 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         if (internalTopicManager != null) {
             log.debug("Starting to validate internal source topics in partition assignor.");
 
-            for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
-                String topic = streamThread.jobId + "-" + entry.getKey();
+            if (internalSourceTopicToTaskIds != null) {
+                for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) {
+                    String topic = streamThread.jobId + "-" + entry.getKey();
 
-                // should have size 1 only
-                int numPartitions = -1;
-                for (TaskId task : entry.getValue()) {
-                    numPartitions = task.partition;
-                }
+                    // should have size 1 only
+                    int numPartitions = -1;
+                    for (TaskId task : entry.getValue()) {
+                        numPartitions = task.partition;
+                    }
 
-                internalTopicManager.makeReady(topic, numPartitions);
+                    internalTopicManager.makeReady(topic, numPartitions);
 
-                // wait until the topic metadata has been propagated to all brokers
-                List<PartitionInfo> partitions;
-                do {
-                    partitions = streamThread.restoreConsumer.partitionsFor(topic);
-                } while (partitions == null || partitions.size() != numPartitions);
+                    // wait until the topic metadata has been propagated to all brokers
+                    List<PartitionInfo> partitions;
+                    do {
+                        partitions = streamThread.restoreConsumer.partitionsFor(topic);
+                    } while (partitions == null || partitions.size() != numPartitions);
 
-                metadata.update(topic, partitions);
+                    metadata.update(topic, partitions);
+                }
             }
 
             log.info("Completed validating internal source topics in partition assignor.");