You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/09 13:55:33 UTC

[incubator-seatunnel] branch dev updated: [Improve][connector][kafka] sink support custom partition (#3041)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ebddc18c4 [Improve][connector][kafka] sink support custom partition (#3041)
ebddc18c4 is described below

commit ebddc18c41c30be8955a0378d6505b9a2cbd7feb
Author: TaoZex <45...@users.noreply.github.com>
AuthorDate: Sun Oct 9 21:55:29 2022 +0800

    [Improve][connector][kafka] sink support custom partition (#3041)
---
 docs/en/connector-v2/sink/Kafka.md                 | 99 ++++++++++++++++++++++
 .../connectors/seatunnel/kafka/config/Config.java  | 13 ++-
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 13 ++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 19 ++++-
 .../kafka/sink/MessageContentPartitioner.java      | 59 +++++++++++++
 5 files changed, 199 insertions(+), 4 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md
new file mode 100644
index 000000000..3dbc6af89
--- /dev/null
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -0,0 +1,99 @@
+# Kafka
+
+> Kafka sink connector
+## Description
+
+Write Rows to a Kafka topic.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+
+## Options
+
+| name               | type                   | required | default value |
+| ------------------ | ---------------------- | -------- | ------------- |
+| topic              | string                 | yes      | -             |
+| bootstrap.servers  | string                 | yes      | -             |
+| kafka.*            | kafka producer config  | no       | -             |
+| semantic           | string                 | no       | NON           |
+| partition          | int                    | no       | -             |
+| assign_partitions  | list                   | no       | -             |
+| transaction_prefix | string                 | no       | -             |
+| common-options     |                        | no       | -             |
+
+### topic [string]
+
+Kafka Topic.
+
+### bootstrap.servers [string]
+
+Kafka Brokers List.
+
+### kafka.* [kafka producer config]
+
+In addition to the above parameters that must be specified by the `Kafka producer` client, the user can also specify multiple non-mandatory parameters for the `producer` client, covering [all the producer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#producerconfigs).
+
+The way to specify the parameter is to add the prefix `kafka.` to the original parameter name. For example, the way to specify `request.timeout.ms` is: `kafka.request.timeout.ms = 60000` . If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.
+
+### semantic [string]
+
+Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.
+
+In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.
+
+In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.
+
+NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
+
+### partition [int]
+
+We can specify the partition, all messages will be sent to this partition.
+
+### assign_partitions [list]
+
+We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.
+
+For example, there are five partitions in total, and the assign_partitions field in config is as follows:
+assign_partitions = ["shoe", "clothing"]
+
+Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscripted as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.
+
+This function by `MessageContentPartitioner` class implements `org.apache.kafka.clients.producer.Partitioner` interface.If we need custom partitions, we need to implement this interface as well.
+
+### transaction_prefix [string]
+
+If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction.
+Kafka distinguishes different transactions by different transactionId. This parameter is prefix of  kafka  transactionId, make sure different job use different prefix.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+sink {
+
+  kafka {
+      topic = "seatunnel"
+      bootstrap.servers = "localhost:9092"
+      partition = 3
+      kafka.request.timeout.ms = 60000
+      semantics = EXACTLY_ONCE
+  }
+  
+}
+```
+
+
+###  change log
+####  next version
+ 
+ - Add kafka sink doc 
+ - New feature : Kafka specified partition to send 
+ - New feature : Determine the partition that kafka send based on the message content
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index d48d12cf6..0502afda3 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -40,9 +40,8 @@ public class Config {
      */
     public static final String CONSUMER_GROUP = "consumer.group";
 
-
     /**
-     * consumer group of kafka client consume message.
+     * consumer offset will be periodically committed in the background.
      */
     public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";
 
@@ -50,4 +49,14 @@ public class Config {
      * The prefix of kafka's transactionId, make sure different job use different prefix.
      */
     public static final String TRANSACTION_PREFIX = "transaction_prefix";
+
+    /**
+     * Send information according to the specified partition.
+     */
+    public static final String PARTITION = "partition";
+
+    /**
+     * Determine the partition to send based on the content of the message.
+     */
+    public static final String ASSIGN_PARTITIONS = "assign_partitions";
 }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index e80a7d306..29599bbdb 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
 
+    private int partation = -1;
     private final String topic;
     private final JsonSerializationSchema jsonSerializationSchema;
 
@@ -33,8 +34,18 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byt
         this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
     }
 
+    public DefaultSeaTunnelRowSerializer(String topic, int partation, SeaTunnelRowType seaTunnelRowType) {
+        this(topic, seaTunnelRowType);
+        this.partation = partation;
+    }
+
     @Override
     public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
-        return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
+        if (this.partation != -1) {
+            return new ProducerRecord<>(topic, this.partation, null, jsonSerializationSchema.serialize(row));
+        }
+        else {
+            return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index b577067ef..1f61482e7 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
 
@@ -51,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     private String transactionPrefix;
     private long lastCheckpointId = 0;
+    private int partition;
 
     private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
     private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;
@@ -71,6 +74,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.pluginConfig = pluginConfig;
+        if (pluginConfig.hasPath(PARTITION)) {
+            this.partition = pluginConfig.getInt(PARTITION);
+        }
+        if (pluginConfig.hasPath(ASSIGN_PARTITIONS)) {
+            MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS));
+        }
         if (pluginConfig.hasPath(TRANSACTION_PREFIX)) {
             this.transactionPrefix = pluginConfig.getString(TRANSACTION_PREFIX);
         } else {
@@ -129,6 +138,9 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
         kafkaConfig.entrySet().forEach(entry -> {
             kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
         });
+        if (pluginConfig.hasPath(ASSIGN_PARTITIONS)) {
+            kafkaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
+        }
         kafkaProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
@@ -137,7 +149,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+        if (pluginConfig.hasPath(PARTITION)){
+            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition, seaTunnelRowType);
+        }
+        else {
+            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+        }
     }
 
     private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java
new file mode 100644
index 000000000..c5fa37729
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/MessageContentPartitioner.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.Map;
+
+public class MessageContentPartitioner implements Partitioner {
+    private static List<String> ASSIGNPARTITIONS;
+
+    public static void setAssignPartitions(List<String> assignPartitionList) {
+        ASSIGNPARTITIONS = assignPartitionList;
+    }
+
+    @Override
+    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
+        int numPartitions = partitions.size();
+
+        int assignPartitionsSize = ASSIGNPARTITIONS.size();
+        String message = new String(valueBytes);
+        for (int i = 0; i < assignPartitionsSize; i++) {
+            if (message.contains(ASSIGNPARTITIONS.get(i))) {
+                return i;
+            }
+        }
+        //Choose one of the remaining partitions according to the hashcode.
+        return ((message.hashCode() & Integer.MAX_VALUE) % (numPartitions - assignPartitionsSize)) + assignPartitionsSize;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}