You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/16 18:53:48 UTC

[GitHub] [kafka] guozhangwang opened a new pull request, #12304: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner

guozhangwang opened a new pull request, #12304:
URL: https://github.com/apache/kafka/pull/12304

   There are some considerata embedded in this seemingly straight-forward PR that I'd like to explain here. The StreamPartitioner is used to send records to three types of topics:
   
   1) repartition topics, where key should never be null.
   2) changelog topics, where key should never be null.
   3) sink topics, where only non-windowed key could be null and windowed key should still never be null.
   
   Also, the StreamPartitioner is used as part of the IQ to determine which host contains a certain key, as determined by the case 2) above.
   
   This PR's main goal is to remove the deprecated producer's default partitioner, while with those things in mind such that:
   
   1) We want to make sure for not-null keys, the default murmur2 hash behavior of the streams' partitioner stays consistent with producer's new built-in partitioner.
   2) For null-keys (which is only possible for non-window default stream partition, and is never used for IQ), we would fix the issue that we may never rotate to a new partitioner by setting the partition as `null` hence relying on the newly introduced built-in partitioner.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12304: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12304:
URL: https://github.com/apache/kafka/pull/12304


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12304: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12304:
URL: https://github.com/apache/kafka/pull/12304#issuecomment-1158064650

   Seems there is some checkstyle error:
   ```
   > Task :clients:checkstyleMain
   
   [2022-06-16T18:57:40.716Z] [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-12304/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java:21:8: Unused import - org.apache.kafka.common.utils.Utils. [UnusedImports]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12304: KAFKA-13880: Remove DefaultPartitioner from StreamPartitioner

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12304:
URL: https://github.com/apache/kafka/pull/12304#discussion_r899425425


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java:
##########
@@ -279,6 +279,13 @@ public int partition() {
         }
     }
 
+    /*
+     * Default hashing function to choose a partition from the serialized key bytes
+     */
+    public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {

Review Comment:
   The intention of introducing this static function is to make sure everyone (including streams)'s default partitioner behavior is the same; and in the future if we ever change the algorithm we would make sure we change everyone as well.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java:
##########
@@ -15,28 +15,30 @@
  * limitations under the License.
  */
 package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.Cluster;
+
+import org.apache.kafka.clients.producer.internals.BuiltInPartitioner;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 
 public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> {
 
-    private final Cluster cluster;
     private final Serializer<K> keySerializer;
 
-    @SuppressWarnings("deprecation")
-    private final org.apache.kafka.clients.producer.internals.DefaultPartitioner defaultPartitioner;
-
-    @SuppressWarnings("deprecation")
-    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) {
-        this.cluster = cluster;
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer) {
         this.keySerializer = keySerializer;
-        this.defaultPartitioner = new org.apache.kafka.clients.producer.internals.DefaultPartitioner();
     }
 
     @Override
     public Integer partition(final String topic, final K key, final V value, final int numPartitions) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
-        return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster, numPartitions);
+
+        // if the key bytes are not available, we just return null to let the producer to decide
+        // which partition to send internally; otherwise stick with the same built-in partitioner
+        // util functions that producer used to make sure its behavior is consistent with the producer
+        if (keyBytes == null) {

Review Comment:
   Here is the intended fix for relying on the new built-in partitioner, with the assumption in mind that if a key is null it would never be used for IQ purposes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org