You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/23 17:08:44 UTC

[kafka] branch trunk updated: KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6cfed8a  KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)
6cfed8a is described below

commit 6cfed8ad0061cdb2c71df03001cbd485491d6dfa
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Mon Mar 23 17:07:54 2020 +0000

    KAFKA-9651: Fix ArithmeticException (÷ by 0) in DefaultStreamPartitioner (#8226)
    
    In Streams `StreamsMetadataState.getMetadataWithKey`, we should use the inferred max topic partitions passed in directly from the caller than relying on cluster to contain its topic-partition information.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../producer/internals/DefaultPartitioner.java     | 26 ++++++++++++++++------
 .../internals/DefaultStreamPartitioner.java        |  2 +-
 2 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 85c5e4e..cf765d1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -16,14 +16,12 @@
  */
 package org.apache.kafka.clients.producer.internals;
 
-import java.util.List;
-import java.util.Map;
-
 import org.apache.kafka.clients.producer.Partitioner;
 import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.utils.Utils;
 
+import java.util.Map;
+
 /**
  * The default partitioning strategy:
  * <ul>
@@ -50,11 +48,25 @@ public class DefaultPartitioner implements Partitioner {
      * @param cluster The current cluster metadata
      */
     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
+    }
+
+    /**
+     * Compute the partition for the given record.
+     *
+     * @param topic The topic name
+     * @param numPartitions The number of partitions of the given {@code topic}
+     * @param key The key to partition on (or null if no key)
+     * @param keyBytes serialized key to partition on (or null if no key)
+     * @param value The value to partition on or null
+     * @param valueBytes serialized value to partition on or null
+     * @param cluster The current cluster metadata
+     */
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
+                         int numPartitions) {
         if (keyBytes == null) {
             return stickyPartitionCache.partition(topic, cluster);
-        } 
-        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
-        int numPartitions = partitions.size();
+        }
         // hash the keyBytes to choose a partition
         return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index cb64c3a..a90a028 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -36,6 +36,6 @@ public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
     @Override
     public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
-        return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster);
+        return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster, numPartitions);
     }
 }