You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/03/03 03:37:00 UTC

[jira] [Comment Edited] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

    [ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500479#comment-17500479 ] 

shizhengchao edited comment on FLINK-26033 at 3/3/22, 3:36 AM:
---------------------------------------------------------------

[~MartijnVisser] [~renqs]   I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, `org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true.  So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design


was (Author: tinny):
[~MartijnVisser] [~renqs]   I got through testing that the RoundRobinPartitioner built into kafka does not work either, that is it can't distribute the writes to all partitions equally, due to abortForNewBatch. For example, there are 10 partitions, `

org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even partitions, due to abortForNewBatch is true.  So we should implement a round-robin partitioner in flink, and need to automatically discover the partition design

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26033
>                 URL: https://issues.apache.org/jira/browse/FLINK-26033
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>            Reporter: shizhengchao
>            Assignee: shizhengchao
>            Priority: Major
>              Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
>         ReadableConfig tableOptions, ClassLoader classLoader) {
>     return tableOptions
>             .getOptional(SINK_PARTITIONER)
>             .flatMap(
>                     (String partitioner) -> {
>                         switch (partitioner) {
>                             case SINK_PARTITIONER_VALUE_FIXED:
>                                 return Optional.of(new FlinkFixedPartitioner<>());
>                             case SINK_PARTITIONER_VALUE_DEFAULT:
>                             case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
>                                 return Optional.empty();
>                                 // Default fallback to full class name of the partitioner.
>                             default:
>                                 return Optional.of(
>                                         initializePartitioner(partitioner, classLoader));
>                         }
>                     });
> } {code}
> They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
>     if (keyBytes == null) {
>         // Random when there is no key        
>         return stickyPartitionCache.partition(topic, cluster);
>     } 
>     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>     int numPartitions = partitions.size();
>     // hash the keyBytes to choose a partition
>     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
>     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
>     public void configure(Map<String, ?> configs) {}
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic The topic name
>      * @param key The key to partition on (or null if no key)
>      * @param keyBytes serialized key to partition on (or null if no key)
>      * @param value The value to partition on or null
>      * @param valueBytes serialized value to partition on or null
>      * @param cluster The current cluster metadata
>      */
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
>         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>         int numPartitions = partitions.size();
>         int nextValue = nextValue(topic);
>         List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
>         if (!availablePartitions.isEmpty()) {
>             int part = Utils.toPositive(nextValue) % availablePartitions.size();
>             return availablePartitions.get(part).partition();
>         } else {
>             // no partitions are available, give a non-available partition
>             return Utils.toPositive(nextValue) % numPartitions;
>         }
>     }
>     private int nextValue(String topic) {
>         AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
>             return new AtomicInteger(0);
>         });
>         return counter.getAndIncrement();
>     }
>     public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)