You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/09 13:55:33 UTC
[incubator-seatunnel] branch dev updated: [Improve][connector][kafka] sink support custom partition (#3041)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ebddc18c4 [Improve][connector][kafka] sink support custom partition (#3041)
ebddc18c4 is described below
commit ebddc18c41c30be8955a0378d6505b9a2cbd7feb
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Sun Oct 9 21:55:29 2022 +0800
[Improve][connector][kafka] sink support custom partition (#3041)
---
docs/en/connector-v2/sink/Kafka.md | 99 ++++++++++++++++++++++
.../connectors/seatunnel/kafka/config/Config.java | 13 ++-
.../serialize/DefaultSeaTunnelRowSerializer.java | 13 ++-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 19 ++++-
.../kafka/sink/MessageContentPartitioner.java | 59 +++++++++++++
5 files changed, 199 insertions(+), 4 deletions(-)
diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md
new file mode 100644
index 000000000..3dbc6af89
--- /dev/null
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -0,0 +1,99 @@
+# Kafka
+
+> Kafka sink connector
+## Description
+
+Write Rows to a Kafka topic.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name | type | required | default value |
+| ------------------ | ---------------------- | -------- | ------------- |
+| topic | string | yes | - |
+| bootstrap.servers | string | yes | - |
+| kafka.* | kafka producer config | no | - |
+| semantic | string | no | NON |
+| partition | int | no | - |
+| assign_partitions | list | no | - |
+| transaction_prefix | string | no | - |
+| common-options | | no | - |
+
+### topic [string]
+
+Kafka Topic.
+
+### bootstrap.servers [string]
+
+Kafka Brokers List.
+
+### kafka.* [kafka producer config]
+
+In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs).
+
+The way to specify the parameter is to add the prefix `kafka.` to the original parameter name. For example, the way to specify `request.timeout.ms` is: `kafka.request.timeout.ms = 60000` . If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.
+
+### semantic [string]
+
+Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+
+In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.
+
+In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
+
+NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
+
+### partition [int]
+
+We can specify the partition, all messages will be sent to this partition.
+
+### assign_partitions [list]
+
+We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.
+
+For example, there are five partitions in total, and the assign_partitions field in config is as follows:
+assign_partitions = ["shoe", "clothing"]
+
+Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscripted as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.
+
+This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well.
+
+### transaction_prefix [string]
+
+If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction.
+Kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+
+ kafka {
+ topic = "seatunnel"
+ bootstrap.servers = "localhost:9092"
+ partition = 3
+ kafka.request.timeout.ms = 60000
+ semantics = EXACTLY_ONCE
+ }
+
+}
+```
+
+
+### change log
+#### next version
+
+ - Add kafka sink doc
+ - New feature : Kafka specified partition to send
+ - New feature : Determine the partition that kafka send based on the message content
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d48d12cf6..0502afda3 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -40,9 +40,8 @@ public class Config {
*/
public static final String CONSUMER_GROUP = "consumer.group";
-
/**
- * consumer group of kafka client consume message.
+ * consumer offset will be periodically committed in the background.
*/
public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";
@@ -50,4 +49,14 @@ public class Config {
* The prefix of kafka's transactionId, make sure different job use different prefix.
*/
public static final String TRANSACTION_PREFIX = "transaction_prefix";
+
+ /**
+ * Send information according to the specified partition.
+ */
+ public static final String PARTITION = "partition";
+
+ /**
+ * Determine the partition to send based on the content of the message.
+ */
+ public static final String ASSIGN_PARTITIONS = "assign_partitions";
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index e80a7d306..29599bbdb 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
+ private int partation = -1;
private final String topic;
private final JsonSerializationSchema jsonSerializationSchema;
@@ -33,8 +34,18 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byt
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
}
+ public DefaultSeaTunnelRowSerializer(String topic, int partation, SeaTunnelRowType seaTunnelRowType) {
+ this(topic, seaTunnelRowType);
+ this.partation = partation;
+ }
+
@Override
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
- return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
+ if (this.partation != -1) {
+ return new ProducerRecord<>(topic, this.partation, null, jsonSerializationSchema.serialize(row));
+ }
+ else {
+ return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index b577067ef..1f61482e7 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
@@ -51,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
private String transactionPrefix;
private long lastCheckpointId = 0;
+ private int partition;
private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
@@ -71,6 +74,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
+ if (pluginConfig.hasPath(PARTITION)) {
+ this.partition = pluginConfig.getInt(PARTITION);
+ }
+ if (pluginConfig.hasPath(ASSIGN_PARTITIONS)) {
+ MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS));
+ }
if (pluginConfig.hasPath(TRANSACTION_PREFIX)) {
this.transactionPrefix = pluginConfig.getString(TRANSACTION_PREFIX);
} else {
@@ -129,6 +138,9 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
});
+ if (pluginConfig.hasPath(ASSIGN_PARTITIONS)) {
+ kafkaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
+ }
kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
@@ -137,7 +149,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
// todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+ if (pluginConfig.hasPath(PARTITION)){
+ return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition, seaTunnelRowType);
+ }
+ else {
+ return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+ }
}
private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java
new file mode 100644
index 000000000..c5fa37729
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class MessageContentPartitioner implements Partitioner {
+ private static List<String> ASSIGNPARTITIONS;
+
+ public static void setAssignPartitions(List<String> assignPartitionList) {
+ ASSIGNPARTITIONS = assignPartitionList;
+ }
+
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+ int numPartitions = partitions.size();
+
+ int assignPartitionsSize = ASSIGNPARTITIONS.size();
+ String message = new String(valueBytes);
+ for (int i = 0; i < assignPartitionsSize; i++) {
+ if (message.contains(ASSIGNPARTITIONS.get(i))) {
+ return i;
+ }
+ }
+ //Choose one of the remaining partitions according to the hashcode.
+ return ((message.hashCode() & Integer.MAX_VALUE) % (numPartitions - assignPartitionsSize)) + assignPartitionsSize;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}