You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/12 02:59:11 UTC
[inlong] branch master updated: [INLONG-5451][Sort] Kafka sink use customized partitioner (#5454)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 036fce858 [INLONG-5451][Sort] Kafka sink use customized partitioner (#5454)
036fce858 is described below
commit 036fce858b60b82198f1e08d35ce600119c73324
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Fri Aug 12 10:59:06 2022 +0800
[INLONG-5451][Sort] Kafka sink use customized partitioner (#5454)
---
.../sink/kafka/KafkaProducerCluster.java | 5 +-
.../standalone/sink/kafka/PartitionerSelector.java | 251 +++++++++++++++++++++
.../sink/kafka/PartitionerSelectorTest.java | 69 ++++++
3 files changed, 324 insertions(+), 1 deletion(-)
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 94d468760..ba7118fcb 100644
--- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -79,13 +79,16 @@ public class KafkaProducerCluster implements LifecycleAware {
this.state = LifecycleState.START;
try {
Properties props = new Properties();
- props.putAll(context.getParameters());
+ props.put(
+ ProducerConfig.PARTITIONER_CLASS_CONFIG,
+ context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, PartitionerSelector.class.getName()));
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
context.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(
ProducerConfig.CLIENT_ID_CONFIG,
context.getString(ProducerConfig.CLIENT_ID_CONFIG) + "-" + workerName);
+ props.putAll(context.getParameters());
LOG.info("init kafka client info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new ByteArraySerializer());
Preconditions.checkNotNull(producer);
diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelector.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelector.java
new file mode 100644
index 000000000..e0c4273b2
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelector.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.standalone.sink.kafka;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.internals.StickyPartitionCache;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Divide partitions by client.id.
+ * The default partitioning strategy: if the number of producers is bigger than 1,
+ * and the number of partitions bigger than producers, the producers divide partitions equally.
+ * <li>If a partition is specified in the record, use it</li>
+ * <li>If no partition is specified but a key is present choose a partition based on a hash of the key</li>
+ * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.</li>
+ *
+ * See KIP-480 for details about sticky partitioning.
+ */
+public class PartitionerSelector implements Partitioner {
+
+ private final StickyPartitionCache stickyPartitionCache = new MyStickyPartitionCache();
+
+ /**
+ * clientId and number of producers
+ */
+ private static Map<String, AtomicInteger> clientIdIndex = new ConcurrentHashMap<>();
+
+ /**
+ * index of current producer.
+ */
+ private int index;
+
+ /**
+ * clientId
+ */
+ private String clientId;
+
+ /**
+ * assign index for each producer.
+ *
+ */
+ @Override
+ public void configure(Map<String, ?> configs) {
+ clientId = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
+
+ if (clientId != null && !clientId.isEmpty()) {
+ if (clientId.contains("-")) {
+ clientId = clientId.split("-")[0];
+ }
+ }
+
+ AtomicInteger atomicInteger = clientIdIndex.get(clientId);
+ if (atomicInteger == null) {
+ clientIdIndex.putIfAbsent(clientId, new AtomicInteger(0));
+ atomicInteger = clientIdIndex.get(clientId);
+ }
+
+ index = atomicInteger.getAndIncrement();
+ }
+
+ /**
+ * Compute the partition for the given record.
+ *
+ * @param topic The topic name
+ * @param key The key to partition on (or null if no key)
+ * @param keyBytes serialized key to partition on (or null if no key)
+ * @param value The value to partition on or null
+ * @param valueBytes serialized value to partition on or null
+ * @param cluster The current cluster metadata
+ */
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes,
+ Object value, byte[] valueBytes, Cluster cluster) {
+ if (keyBytes == null) {
+ return stickyPartitionCache.partition(topic, cluster);
+ }
+ List<PartitionInfo> partitions = cluster.availablePartitionsForTopic(topic);
+ int numPartitions = partitions.size();
+
+ if (numPartitions == 0) {
+ partitions = cluster.partitionsForTopic(topic);
+ numPartitions = partitions.size();
+ }
+
+ // hash the keyBytes to choose a partition
+ return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+
+ /**
+ * reduce the number of total producer when this one is closed.
+ */
+ @Override
+ public void close() {
+ AtomicInteger atomicInteger = clientIdIndex.get(clientId);
+ if (atomicInteger == null) {
+ return;
+ }
+ atomicInteger.decrementAndGet();
+ }
+
+ /**
+ * If a batch completed for the current sticky partition, change the sticky partition. Alternately, if
+ * no sticky partition has been determined, set one.
+ */
+ @Override
+ public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
+ stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
+ }
+
+ /**
+ * Customized sticky partition selector.
+ */
+ class MyStickyPartitionCache extends StickyPartitionCache {
+
+ private final ConcurrentMap<String, Integer> indexCache;
+
+ public MyStickyPartitionCache() {
+ this.indexCache = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public int partition(String topic, Cluster cluster) {
+ Integer part = indexCache.get(topic);
+ if (part == null) {
+ return nextPartition(topic, cluster, -1);
+ }
+ return part;
+ }
+
+ @Override
+ public int nextPartition(String topic, Cluster cluster, int prevPartition) {
+ Integer oldPart = indexCache.get(topic);
+ boolean needChangePartition = (oldPart == null || oldPart == prevPartition);
+ // no need to change partition, just return old one
+ if (!needChangePartition) {
+ return oldPart;
+ }
+
+ // get all available partitions
+ List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
+ // if no available partition, return 0;
+ if (availablePartitions == null || availablePartitions.size() <= 0) {
+ return prevPartition;
+ }
+ // if only one partition, return this.
+ int availablePartitionsSize = availablePartitions.size();
+ if (availablePartitionsSize == 1) {
+ int newPart = availablePartitions.get(0).partition();
+ indexCache.put(topic, newPart);
+ return newPart;
+ }
+
+ // get the number of producer under this client id
+ int producerCount = clientIdIndex.get(clientId).get();
+
+ // more than 1 producer and more partitions than producers, assign them equally.
+ if (producerCount > 1 && availablePartitionsSize > producerCount) {
+ availablePartitions = averageAssign(availablePartitions, producerCount, index);
+ availablePartitionsSize = availablePartitions.size();
+ }
+
+ // only one partition or null old partition
+ if (availablePartitionsSize == 1 || oldPart == null) {
+ int newPart = availablePartitions.get(0).partition();
+ indexCache.put(topic, newPart);
+ return newPart;
+ }
+
+ // find old partition cursor
+ int indexOldPart = -1;
+ for (int i = 0; i < availablePartitionsSize; i++) {
+ PartitionInfo partition = availablePartitions.get(i);
+ if (partition.partition() == oldPart) {
+ indexOldPart = i;
+ break;
+ }
+ }
+
+ // if no old cursor, return the new partition
+ if (indexOldPart < 0) {
+ int newPart = availablePartitions.get(0).partition();
+ indexCache.replace(topic, prevPartition, newPart);
+ return newPart;
+ }
+
+ // calculate cursor of new partition
+ int indexNewPart = (indexOldPart + 1) % availablePartitionsSize;
+ int newPart = availablePartitions.get(indexNewPart).partition();
+ indexCache.replace(topic, prevPartition, newPart);
+ return newPart;
+ }
+ }
+
+ /**
+ * group a batch of source, and get the part of given index.
+ *
+ * @param source source to be grouped
+ * @param pageSize size of group
+ * @param index current index
+ */
+ public static <T> List<T> averageAssign(List<T> source, int pageSize, int index) {
+
+ if (source == null || source.isEmpty()) {
+ return source;
+ }
+
+ int remainder = source.size() % pageSize;
+ int number = source.size() / pageSize;
+ int fromIndex = index * number;
+
+ fromIndex += Math.min(index, remainder);
+ boolean hasRemainder = index < remainder;
+ int toIndex = fromIndex + number + (hasRemainder ? 1 : 0);
+
+ if (fromIndex >= source.size()) {
+ return new ArrayList<>();
+ }
+
+ if (toIndex > source.size()) {
+ toIndex = source.size();
+ }
+
+ return source.subList(fromIndex, toIndex);
+ }
+
+}
diff --git a/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelectorTest.java b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelectorTest.java
new file mode 100644
index 000000000..4f06dfe9e
--- /dev/null
+++ b/inlong-sort-standalone/sort-standalone-source/src/test/java/org/apache/inlong/sort/standalone/sink/kafka/PartitionerSelectorTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.sort.standalone.sink.kafka;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+/**
+ * Test for PartitionerSelector
+ */
+public class PartitionerSelectorTest {
+
+ private static final String V_INLONG_ID = "testId";
+ private static final String V_INLONG_ID_TOPIC = "U_TOPIC_testId";
+ private static final String V_LOCALIP = "127.0.0.1";
+ private static final int V_PORT = 1234;
+
+ @Test
+ public void test() {
+ try {
+ Node node = new Node(0, V_LOCALIP, V_PORT);
+ List<Node> nodes = new ArrayList<>();
+ nodes.add(node);
+ List<PartitionInfo> partitionInfos = new ArrayList<>();
+ partitionInfos.add(new PartitionInfo(V_INLONG_ID_TOPIC, 0,
+ node, new Node[0], new Node[0]));
+ partitionInfos.add(new PartitionInfo(V_INLONG_ID_TOPIC, 1,
+ node, new Node[0], new Node[0]));
+
+ PartitionerSelector obj = new PartitionerSelector();
+ HashMap<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
+ obj.configure(configs);
+ Cluster cluster = new Cluster("clusterId", nodes, partitionInfos,
+ new HashSet<String>(), new HashSet<String>());
+ obj.partition(V_INLONG_ID_TOPIC, null, null, null,null, cluster);
+ obj.partition(V_INLONG_ID_TOPIC, "", V_INLONG_ID.getBytes(),
+ V_INLONG_ID.getBytes(), V_INLONG_ID.getBytes(), cluster);
+ obj.onNewBatch(V_INLONG_ID_TOPIC, cluster, 0);
+ obj.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
\ No newline at end of file