You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/02 21:13:03 UTC
kafka git commit: KAFKA-2358: Cluster collection returning methods
never return null
Repository: kafka
Updated Branches:
refs/heads/trunk 1b902b4ed -> ca06862a7
KAFKA-2358: Cluster collection returning methods never return null
See https://issues.apache.org/jira/browse/KAFKA-2358
Author: Stevo Slavic <ss...@gmail.com>
Reviewers: Jason Gustafson, Guozhang Wang
Closes #96 from sslavic/feature/KAFKA-2358
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ca06862a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ca06862a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ca06862a
Branch: refs/heads/trunk
Commit: ca06862a7005ca476f900bd9c2373021422d695b
Parents: 1b902b4
Author: Stevo Slavic <ss...@gmail.com>
Authored: Thu Mar 2 13:12:59 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Mar 2 13:12:59 2017 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/kafka/clients/Metadata.java | 2 +-
.../org/apache/kafka/clients/consumer/KafkaConsumer.java | 2 +-
.../org/apache/kafka/clients/producer/MockProducer.java | 2 +-
clients/src/main/java/org/apache/kafka/common/Cluster.java | 9 ++++++---
.../test/java/org/apache/kafka/clients/MetadataTest.java | 2 +-
.../kafka/streams/processor/DefaultPartitionGrouper.java | 4 ++--
.../processor/internals/StreamPartitionAssignor.java | 2 +-
.../streams/processor/internals/StreamsMetadataState.java | 2 +-
8 files changed, 14 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 65da330..87e5862 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -329,7 +329,7 @@ public final class Metadata {
for (String topic : this.topics.keySet()) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
- if (partitionInfoList != null) {
+ if (!partitionInfoList.isEmpty()) {
partitionInfos.addAll(partitionInfoList);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 63a39fa..1e37497 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1342,7 +1342,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
- if (parts != null)
+ if (!parts.isEmpty())
return parts;
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index bafb048..35f5d94 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -114,7 +114,7 @@ public class MockProducer<K, V> implements Producer<K, V> {
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
int partition = 0;
- if (this.cluster.partitionsForTopic(record.topic()) != null)
+ if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
partition = partition(record, this.cluster);
TopicPartition topicPartition = new TopicPartition(record.topic(), partition);
ProduceRequestResult result = new ProduceRequestResult(topicPartition);
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index b7408e3..ba1d2af 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -207,7 +207,8 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> partitionsForTopic(String topic) {
- return this.partitionsByTopic.get(topic);
+ List<PartitionInfo> parts = this.partitionsByTopic.get(topic);
+ return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
}
/**
@@ -226,7 +227,8 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> availablePartitionsForTopic(String topic) {
- return this.availablePartitionsByTopic.get(topic);
+ List<PartitionInfo> parts = this.availablePartitionsByTopic.get(topic);
+ return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
}
/**
@@ -235,7 +237,8 @@ public final class Cluster {
* @return A list of partitions
*/
public List<PartitionInfo> partitionsForNode(int nodeId) {
- return this.partitionsByNode.get(nodeId);
+ List<PartitionInfo> parts = this.partitionsByNode.get(nodeId);
+ return (parts == null) ? Collections.<PartitionInfo>emptyList() : parts;
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1a05abc..084ccd8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -412,7 +412,7 @@ public class MetadataTest {
private Thread asyncFetch(final String topic, final long maxWaitMs) {
Thread thread = new Thread() {
public void run() {
- while (metadata.fetch().partitionsForTopic(topic) == null) {
+ while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
try {
metadata.awaitUpdate(metadata.requestUpdate(), maxWaitMs);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
index 5e4da4b..19e4809 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java
@@ -62,7 +62,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
for (String topic : topicGroup) {
List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
- if (partitions != null && partitionId < partitions.size()) {
+ if (partitionId < partitions.size()) {
group.add(new TopicPartition(topic, partitionId));
}
}
@@ -81,7 +81,7 @@ public class DefaultPartitionGrouper implements PartitionGrouper {
for (String topic : topics) {
List<PartitionInfo> partitions = metadata.partitionsForTopic(topic);
- if (partitions == null) {
+ if (partitions.isEmpty()) {
log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic);
return StreamPartitionAssignor.NOT_AVAILABLE;
} else {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 21b9109..e3f6698 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -406,7 +406,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
}
for (String topic : allSourceTopics) {
List<PartitionInfo> partitionInfoList = metadataWithInternalTopics.partitionsForTopic(topic);
- if (partitionInfoList != null) {
+ if (!partitionInfoList.isEmpty()) {
for (PartitionInfo partitionInfo : partitionInfoList) {
TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
if (!allAssignedPartitions.contains(partition)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca06862a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 67a26bf..bb74b48 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -279,7 +279,7 @@ public class StreamsMetadataState {
this.sourceTopics = sourceTopics;
for (String topic : sourceTopics) {
final List<PartitionInfo> partitions = clusterMetadata.partitionsForTopic(topic);
- if (partitions != null && partitions.size() > maxPartitions) {
+ if (partitions.size() > maxPartitions) {
maxPartitions = partitions.size();
topicWithMostPartitions = partitions.get(0).topic();
}