You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/23 17:08:44 UTC
[kafka] branch trunk updated: KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6cfed8a KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)
6cfed8a is described below
commit 6cfed8ad0061cdb2c71df03001cbd485491d6dfa
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Mon Mar 23 17:07:54 2020 +0000
KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)
In Streams `StreamsMetadataState.getMetadataWithKey`, we should use the inferred max topic partitions passed in directly from the caller than relying on cluster to contain its topic-partition information.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../producer/internals/DefaultPartitioner.java | 26 ++++++++++++++++------
.../internals/DefaultStreamPartitioner.java | 2 +-
2 files changed, 20 insertions(+), 8 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 85c5e4e..cf765d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -16,14 +16,12 @@
*/
package org.apache.kafka.clients.producer.internals;
-import java.util.List;
-import java.util.Map;
-
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
+import java.util.Map;
+
/**
* The default partitioning strategy:
* <ul>
@@ -50,11 +48,25 @@ public class DefaultPartitioner implements Partitioner {
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
+ }
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param topic The topic name
+ * @param numPartitions The number of partitions of the given {@code topic}
+ * @param key The key to partition on (or null if no key)
+ * @param keyBytes serialized key to partition on (or null if no key)
+ * @param value The value to partition on or null
+ * @param valueBytes serialized value to partition on or null
+ * @param cluster The current cluster metadata
+ */
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
+ int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
- }
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
+ }
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index cb64c3a..a90a028 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -36,6 +36,6 @@ public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
@Override
public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
final byte[] keyBytes = keySerializer.serialize(topic, key);
- return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
+ return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster, numPartitions);
}
}