You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "shizhengchao (Jira)" <ji...@apache.org> on 2022/02/09 07:12:00 UTC
[jira] [Created] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
shizhengchao created FLINK-26033:
------------------------------------
Summary: In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect
Key: FLINK-26033
URL: https://issues.apache.org/jira/browse/FLINK-26033
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.14.3, 1.13.3
Reporter: shizhengchao
In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect. Flink treats 'default' and 'round-robin' as the same strategy.
{code:java}
//代码占位符
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
ReadableConfig tableOptions, ClassLoader classLoader) {
return tableOptions
.getOptional(SINK_PARTITIONER)
.flatMap(
(String partitioner) -> {
switch (partitioner) {
case SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new FlinkFixedPartitioner<>());
case SINK_PARTITIONER_VALUE_DEFAULT:
case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
// Default fallback to full class name of the partitioner.
default:
return Optional.of(
initializePartitioner(partitioner, classLoader));
}
});
} {code}
They both use kafka's default partitioner, but the actual There are two scenarios for the partition on DefaultPartitioner:
1. Random when there is no key
2. When there is a key, take the modulo according to the key
{code:java}
// org.apache.kafka.clients.producer.internals.DefaultPartitioner
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
// Random when there is no key
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
} {code}
Therefore, KafkaConnector does not have a round-robin strategy.But we can borrow from kafka's RoundRobinPartitioner
{code:java}
//代码占位符
public class RoundRobinPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}
public void close() {}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)