You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/07/07 14:32:00 UTC
[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
[ https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563814#comment-17563814 ]
Martijn Visser commented on FLINK-26033:
----------------------------------------
[~renqs]Do we still want to review this?
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
> --------------------------------------------------------------------------------------------------
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
> Reporter: shizhengchao
> Assignee: shizhengchao
> Priority: Major
> Labels: pull-request-available, stale-assigned
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key
> return stickyPartitionCache.partition(topic, cluster);
> }
> List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
> public void configure(Map<String, ?> configs) {}
> /**
> * Compute the partition for the given record.
> *
> * @param topic The topic name
> * @param key The key to partition on (or null if no key)
> * @param keyBytes serialized key to partition on (or null if no key)
> * @param value The value to partition on or null
> * @param valueBytes serialized value to partition on or null
> * @param cluster The current cluster metadata
> */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
> List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)