You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/17 02:45:37 UTC

[GitHub] [kafka] ableegoldman commented on a diff in pull request #12803: KAFKA-13602: Adding ability to multicast records.

ableegoldman commented on code in PR #12803:
URL: https://github.com/apache/kafka/pull/12803#discussion_r1024690215


##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -58,5 +62,24 @@
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
+    @Deprecated
     Integer partition(String topic, K key, V value, int numPartitions);
+
+    /**
+     * Determine the partition numbers to which a record, with the given key and value and the current number
+     * of partitions, should be multi-casted to.
+     * @param topic the topic name this record is sent to
+     * @param key the key of the record
+     * @param value the value of the record
+     * @param numPartitions the total number of partitions
+     * @return an Optional of Set of integers between 0 and {@code numPartitions-1},
+     * Empty optional means using default partitioner
+     * Optional of an empty set means the record won't be sent to any partitions i.e drop it.
+     * Optional of Set of integers means the partitions to which the record should be sent to.
+     * */
+    default Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions) {
+        final Integer partition = partition(topic, key, value, numPartitions);
+        return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition(topic, key, value, numPartitions)));

Review Comment:
   ```suggestion
           return partition == null ? Optional.empty() : Optional.of(Collections.singleton(partition));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java:
##########
@@ -58,5 +62,24 @@
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
      */
+    @Deprecated
     Integer partition(String topic, K key, V value, int numPartitions);
+
+    /**
+     * Determine the partition numbers to which a record, with the given key and value and the current number
+     * of partitions, should be multi-casted to.

Review Comment:
   nit: wording here is a bit awkward, sounds like we're talking about the current number of partitions of the record, not the topic -- maybe something like this?
   ```suggestion
        * Determine the number(s) of the partition(s) to which a record with the given key and value should be sent, 
        * for the given topic and current partition count
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -150,16 +151,29 @@ public <K, V> void send(final String topic,
                 );
             }
             if (partitions.size() > 0) {
-                partition = partitioner.partition(topic, key, value, partitions.size());
+                final Optional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
+                if (!maybeMulticastPartitions.isPresent()) {
+                    // New change. Use default partitioner

Review Comment:
   ```suggestion
                       // A null//empty partition indicates we should use the default partitioner
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1059,6 +1071,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
         Objects.requireNonNull(tableJoined, "tableJoined can't be null");
         Objects.requireNonNull(materialized, "materialized can't be null");
 
+

Review Comment:
   nit: accidental extra line?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -459,12 +461,23 @@ private List<StreamsMetadata> rebuildMetadataForSingleTopology(final Map<HostInf
         return rebuiltMetadata;
     }
 
+    private final Function<Optional<Set<Integer>>, Integer> getPartition = maybeMulticastPartitions -> {
+        if (!maybeMulticastPartitions.isPresent()) {
+            return null;
+        }
+        if (maybeMulticastPartitions.get().size() != 1) {
+            throw new IllegalArgumentException("The partitions returned by StreamPartitioner#partitions method when used for fetching KeyQueryMetadata for key should be a singleton set");

Review Comment:
   Hm...the IQ case seems to complicate things a bit, I think we may have missed this during the KIP discussion/design. Because there's no reason to enforce that the partitioner return only a single partition, right? In fact this would even be incorrect in some cases, if the partitioner had originally opted to send this record/key to multiple partitions.
   
   But obviously the current API doesn't allow for that. I think we need to make a small update to the KIP to account for the possibility of multiple partitions here. We can just modify the `KeyQueryMetadata` class to store a set of partitions instead of a single one, and add a new getter method accordingly (ie `KeyQueryMetadata.partitions()`)
   
   I guess we should also deprecate the old getter `partition()`, although to be honest most users will likely not be multicasting records -- so imo it actually makes sense to keep both, that way no one has to change their code to use a new method that they don't even need/want. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org