You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/02 21:13:03 UTC

kafka git commit: KAFKA-2358: Cluster collection returning methods never return null

Repository: kafka
Updated Branches:
  refs/heads/trunk 1b902b4ed -> ca06862a7


KAFKA-2358: Cluster collection returning methods never return null

See https://issues.apache.org/jira/browse/KAFKA-2358

Author: Stevo Slavic <ss...@gmail.com>

Reviewers: Jason Gustafson, Guozhang Wang

Closes #96 from sslavic/feature/KAFKA-2358


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca06862a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca06862a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca06862a

Branch: refs/heads/trunk
Commit: ca06862a7005ca476f900bd9c2373021422d695b
Parents: 1b902b4
Author: Stevo Slavic <ss...@gmail.com>
Authored: Thu Mar 2 13:12:59 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 2 13:12:59 2017 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/kafka/clients/Metadata.java    | 2 +-
 .../org/apache/kafka/clients/consumer/KafkaConsumer.java    | 2 +-
 .../org/apache/kafka/clients/producer/MockProducer.java     | 2 +-
 clients/src/main/java/org/apache/kafka/common/Cluster.java  | 9 ++++++---
 .../test/java/org/apache/kafka/clients/MetadataTest.java    | 2 +-
 .../kafka/streams/processor/DefaultPartitionGrouper.java    | 4 ++--
 .../processor/internals/StreamPartitionAssignor.java        | 2 +-
 .../streams/processor/internals/StreamsMetadataState.java   | 2 +-
 8 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 65da330..87e5862 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -329,7 +329,7 @@ public final class Metadata {
 
             for (String topic : this.topics.keySet()) {
                 List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
-                if (partitionInfoList != null) {
+                if (!partitionInfoList.isEmpty()) {
                     partitionInfos.addAll(partitionInfoList);
                 }
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 63a39fa..1e37497 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1342,7 +1342,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         try {
             Cluster cluster = this.metadata.fetch();
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
-            if (parts != null)
+            if (!parts.isEmpty())
                 return parts;
 
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index bafb048..35f5d94 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -114,7 +114,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
         int partition = 0;
-        if (this.cluster.partitionsForTopic(record.topic()) != null)
+        if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
             partition = partition(record, this.cluster);
         TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
         ProduceRequestResult result = new ProduceRequestResult(topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index b7408e3..ba1d2af 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,7 +207,8 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> partitionsForTopic(String topic) {
-        return this.partitionsByTopic.get(topic);
+        List<PartitionInfo> parts = this.partitionsByTopic.get(topic);
+        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
     }
 
     /**
@@ -226,7 +227,8 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> availablePartitionsForTopic(String topic) {
-        return this.availablePartitionsByTopic.get(topic);
+        List<PartitionInfo> parts = this.availablePartitionsByTopic.get(topic);
+        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
     }
 
     /**
@@ -235,7 +237,8 @@ public final class Cluster {
      * @return A list of partitions
      */
     public List<PartitionInfo> partitionsForNode(int nodeId) {
-        return this.partitionsByNode.get(nodeId);
+        List<PartitionInfo> parts = this.partitionsByNode.get(nodeId);
+        return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1a05abc..084ccd8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -412,7 +412,7 @@ public class MetadataTest {
     private Thread asyncFetch(final String topic, final long maxWaitMs) {
         Thread thread = new Thread() {
             public void run() {
-                while (metadata.fetch().partitionsForTopic(topic) == null) {
+                while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
                     try {
                         metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs);
                     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 5e4da4b..19e4809 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -62,7 +62,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
 
                 for (String topic : topicGroup) {
                     List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
-                    if (partitions != null && partitionId < partitions.size()) {
+                    if (partitionId < partitions.size()) {
                         group.add(new TopicPartition(topic, partitionId));
                     }
                 }
@@ -81,7 +81,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
         for (String topic : topics) {
             List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
 
-            if (partitions == null) {
+            if (partitions.isEmpty()) {
                 log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic);
                 return StreamPartitionAssignor.NOT_AVAILABLE;
             } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 21b9109..e3f6698 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -406,7 +406,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
         for (String topic : allSourceTopics) {
             List<PartitionInfo> partitionInfoList = metadataWithInternalTopics.partitionsForTopic(topic);
-            if (partitionInfoList != null) {
+            if (!partitionInfoList.isEmpty()) {
                 for (PartitionInfo partitionInfo : partitionInfoList) {
                     TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                     if (!allAssignedPartitions.contains(partition)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 67a26bf..bb74b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -279,7 +279,7 @@ public class StreamsMetadataState {
             this.sourceTopics = sourceTopics;
             for (String topic : sourceTopics) {
                 final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
-                if (partitions != null && partitions.size() > maxPartitions) {
+                if (partitions.size() > maxPartitions) {
                     maxPartitions = partitions.size();
                     topicWithMostPartitions = partitions.get(0).topic();
                 }