You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/13 03:07:13 UTC

[GitHub] [incubator-seatunnel] TaoZex opened a new pull request, #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

TaoZex opened a new pull request, #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   issue:https://github.com/apache/incubator-seatunnel/issues/2787
   
   The key is specified by field name.Determine the partition of the kafka send message based on the key.
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r996322486


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -74,6 +92,7 @@ public KafkaSinkWriter(
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.pluginConfig = pluginConfig;
+        this.seaTunnelRowType = seaTunnelRowType;

Review Comment:
   ```suggestion
           this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
   ```
   
   
   Add this method
   
   ```java
       private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
                                                                       SeaTunnelRowType seaTunnelRowType) {
           if (!pluginConfig.hasPath(PARTITION_KEY)){
               return row -> null;
           }
           String partitionKey = pluginConfig.getString(PARTITION_KEY);
           List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
           if (!fieldNames.contains(partitionKey)) {
               return row -> partitionKey;
           }
           int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
           return row -> {
               Object partitionFieldValue = row.getField(partitionFieldIndex);
               if (partitionFieldValue != null) {
                   return partitionFieldValue.toString();
               }
               return null;
           };
       }
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                key = element.getField(fields.indexOf(keyField)).toString();
+            } else {
+                key = keyField;
+            }

Review Comment:
   ```suggestion
               String key = partitionExtractor.apply(element);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r994630459


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(KEY)){
+            String keyField = pluginConfig.getString(KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                key = element.getField(fields.indexOf(keyField)).toString();

Review Comment:
   Don't we need to check null ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r996987792


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
 | bootstrap.servers  | string                 | yes      | -             |
 | kafka.*            | kafka producer config  | no       | -             |
 | semantic           | string                 | no       | NON           |
+| partition_key      | string                 | no       | -           |

Review Comment:
   ```suggestion
   | partition_key      | string                 | no       | -             |
   ```
   
   checkstyle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] dijiekstra commented on pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
dijiekstra commented on PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#issuecomment-1281709907

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r995309605


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -48,4 +48,10 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
             return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
         }
     }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
+        return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row));

Review Comment:
   > check key is null?
   
   I think we don not need check `key` is null, because if the `key` is null, kafka will send it to a random partition(random select a new partition per `topic.metadata.refresh.ms`).
   ```
   The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ashulin merged pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
ashulin merged PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r996445899


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                key = element.getField(fields.indexOf(keyField)).toString();
+            } else {
+                key = keyField;
+            }

Review Comment:
   Hi, @TaoZex  I think @hailin0  gave you a good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r995536895


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
 
+### partition_key [string]
+
+Determine the partition of the kafka send message based on the key.

Review Comment:
   Replace with `Configure which field is used as the key of the kafka message` better.



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -93,7 +111,9 @@ sink {
 
 ###  change log
 ####  next version
- 
+
  - Add kafka sink doc 
  - New feature : Kafka specified partition to send 
- - New feature : Determine the partition that kafka send based on the message content
+ - New feature : Determine the partition that kafka send messag based on the message content
+ - New feature : Determine the partition of the kafka send message based on the field name

Review Comment:
   Replace with `Configure which field is used as the key of the kafka message better` ?



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,28 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                Object field = element.getField(fields.indexOf(keyField));
+                //If the field is null, send the message to the same partition
+                if (field == null) {
+                    key = "null";

Review Comment:
   It is not a good way. because there may be a lot of `null` values.  If you case `null` to "null", it means all of `null` values will be write to a same partition.
   
   You can keep `null` and update the code in `seaTunnelRowSerializer.serializeRowByKey(key, element)` like this
   ```
   return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
   ```
   
   if the key is null, kafka will send it to a random partition(random select a new partition per topic.metadata.refresh.ms).
   



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -48,4 +48,10 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
             return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
         }
     }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
+        return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row));

Review Comment:
   I am sorry, you need check `key` is null. Because you use `key.toBytes()` . You can update to 
   `return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r995556541


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,28 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                Object field = element.getField(fields.indexOf(keyField));
+                //If the field is null, send the message to the same partition
+                if (field == null) {
+                    key = "null";

Review Comment:
   It' s a good way, thanks for your advice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r996638635


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                key = element.getField(fields.indexOf(keyField)).toString();
+            } else {
+                key = keyField;
+            }

Review Comment:
   Thanks to @EricJoy2048  and @hailin0 the good suggestion, I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex commented on PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#issuecomment-1277552794

   @hailin0 PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3085: [Feature][Connector-V2] Support extract partition from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3085:
URL: https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r994684494


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -48,4 +48,10 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
             return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
         }
     }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
+        return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row));

Review Comment:
   check key is null?



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -50,6 +51,21 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
 
+### key [string]
+
+Determine the partition of the kafka send message based on the key.

Review Comment:
   Add `support config filed name`



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
 | bootstrap.servers  | string                 | yes      | -             |
 | kafka.*            | kafka producer config  | no       | -             |
 | semantic           | string                 | no       | NON           |
+| key                | string                 | no       | -           |

Review Comment:
   ```suggestion
   | partition_key      | string                 | no       |-              |
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org