You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/02/09 07:12:00 UTC

[jira] [Created] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

shizhengchao created FLINK-26033:
------------------------------------

             Summary: In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
                 Key: FLINK-26033
                 URL: https://issues.apache.org/jira/browse/FLINK-26033
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.14.3, 1.13.3
            Reporter: shizhengchao


In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.
{code:java}
//代码占位符
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
        ReadableConfig tableOptions, ClassLoader classLoader) {
    return tableOptions
            .getOptional(SINK_PARTITIONER)
            .flatMap(
                    (String partitioner) -> {
                        switch (partitioner) {
                            case SINK_PARTITIONER_VALUE_FIXED:
                                return Optional.of(new FlinkFixedPartitioner<>());
                            case SINK_PARTITIONER_VALUE_DEFAULT:
                            case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
                                return Optional.empty();
                                // Default fallback to full class name of the partitioner.
                            default:
                                return Optional.of(
                                        initializePartitioner(partitioner, classLoader));
                        }
                    });
} {code}
They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
1. Random when there is no key
2. When there is a key, take the modulo according to the key
{code:java}
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (keyBytes == null) {
        // Random when there is no key        
        return stickyPartitionCache.partition(topic, cluster);
    } 
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    // hash the keyBytes to choose a partition
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
} {code}
Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner
{code:java}
//代码占位符
public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {}

} {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)