You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/06/18 03:17:10 UTC
[kafka] branch trunk updated: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cfdd567955 KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)
cfdd567955 is described below
commit cfdd567955588e134770a9145ba57800ca88313c
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Fri Jun 17 20:17:02 2022 -0700
KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner (#12304)
There are some considerata embedded in this seemingly straight-forward PR that I'd like to explain here. The StreamPartitioner is used to send records to three types of topics:
1) repartition topics, where key should never be null.
2) changelog topics, where key should never be null.
3) sink topics, where only non-windowed key could be null and windowed key should still never be null.
Also, the StreamPartitioner is used as part of the IQ to determine which host contains a certain key, as determined by the case 2) above.
This PR's main goal is to remove the deprecated producer's default partitioner, while with those things in mind such that:
We want to make sure for not-null keys, the default murmur2 hash behavior of the streams' partitioner stays consistent with producer's new built-in partitioner.
For null-keys (which is only possible for non-window default stream partition, and is never used for IQ), we would fix the issue that we may never rotate to a new partitioner by setting the partition as null hence relying on the newly introduced built-in partitioner.
Reviewers: Artem Livshits <84...@users.noreply.github.com>, Matthias J. Sax <ma...@confluent.io>
---
.../kafka/clients/producer/KafkaProducer.java | 3 ++-
.../producer/internals/BuiltInPartitioner.java | 7 +++++++
.../producer/internals/DefaultPartitioner.java | 4 +---
.../internals/WindowedStreamPartitioner.java | 10 +++++-----
.../internals/DefaultStreamPartitioner.java | 22 ++++++++++++----------
.../processor/internals/StreamsMetadataState.java | 4 ++--
6 files changed, 29 insertions(+), 21 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index e85d9eb8a9..74d408d9a5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.producer.internals.BufferPool;
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.clients.producer.internals.KafkaProducerMetrics;
import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
@@ -1385,7 +1386,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
- return Utils.toPositive(Utils.murmur2(serializedKey)) % cluster.partitionsForTopic(record.topic()).size();
+ return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
index 1c2d10f3f6..a5805df56b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java
@@ -279,6 +279,13 @@ public class BuiltInPartitioner {
}
}
+ /*
+ * Default hashing function to choose a partition from the serialized key bytes
+ */
+ public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
+ return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+ }
+
/**
* The partition load stats for each topic that are used for adaptive partition distribution.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
index 2c2e79fb20..716773626c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.utils.Utils;
import java.util.Map;
@@ -71,8 +70,7 @@ public class DefaultPartitioner implements Partitioner {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
- // hash the keyBytes to choose a partition
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
public void close() {}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 8e1476a7ed..d68a52b8d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import static org.apache.kafka.common.utils.Utils.toPositive;
-
public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
private final WindowedSerializer<K> serializer;
@@ -43,9 +41,11 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Window
*/
@Override
public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) {
+ // for windowed key, the key bytes should never be null
final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey);
- // hash the keyBytes to choose a partition
- return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ // stick with the same built-in partitioner util functions that producer used
+ // to make sure its behavior is consistent with the producer
+ return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index f5c9c158bc..c7d909c65a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -15,28 +15,30 @@
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Cluster;
+
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
- private final Cluster cluster;
private final Serializer<K> keySerializer;
- @SuppressWarnings("deprecation")
- private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner;
-
- @SuppressWarnings("deprecation")
- public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
- this.cluster = cluster;
+ public DefaultStreamPartitioner(final Serializer<K> keySerializer) {
this.keySerializer = keySerializer;
- this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner();
}
@Override
public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
final byte[] keyBytes = keySerializer.serialize(topic, key);
- return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster, numPartitions);
+
+ // if the key bytes are not available, we just return null to let the producer to decide
+ // which partition to send internally; otherwise stick with the same built-in partitioner
+ // util functions that producer used to make sure its behavior is consistent with the producer
+ if (keyBytes == null) {
+ return null;
+ } else {
+ return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
+ }
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index e8ee3eacf6..6850715b0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -212,7 +212,7 @@ public class StreamsMetadataState {
}
return getKeyQueryMetadataForKey(storeName,
key,
- new DefaultStreamPartitioner<>(keySerializer, clusterMetadata));
+ new DefaultStreamPartitioner<>(keySerializer));
}
/**
@@ -225,7 +225,7 @@ public class StreamsMetadataState {
Objects.requireNonNull(keySerializer, "keySerializer can't be null");
return getKeyQueryMetadataForKey(storeName,
key,
- new DefaultStreamPartitioner<>(keySerializer, clusterMetadata),
+ new DefaultStreamPartitioner<>(keySerializer),
topologyName);
}