You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/19 16:35:33 UTC

[GitHub] [incubator-seatunnel] TaoZex opened a new pull request, #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

TaoZex opened a new pull request, #3147:
URL: https://github.com/apache/incubator-seatunnel/pull/3147

   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   Support extract topic from SeaTunnelRow fields,use value of a field from upstream data as topic,the format is ${field_topic}, where topic is the name of one of the columns of the upstream data.
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex closed pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields
URL: https://github.com/apache/incubator-seatunnel/pull/3147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3147:
URL: https://github.com/apache/incubator-seatunnel/pull/3147#discussion_r1000534502


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -31,6 +31,12 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
 
 Kafka Topic.
 
+Currently two formats are supported:
+
+1. Fill in the name of the topic
+
+2. Use value of a field from upstream data as topic,the format is ${field_topic}, where topic is the name of one of the columns of the upstream data.

Review Comment:
   ```suggestion
   2. Use value of a field from upstream data as topic,the format is ${your field name}, where topic is the value of one of the columns of the upstream data.
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -86,6 +95,8 @@ public KafkaSinkWriter(
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.pluginConfig = pluginConfig;
+        this.topic = extractTopic(pluginConfig.getString(TOPIC));
+        this.topicExtractor = createTopicExtractor(this.topic, seaTunnelRowType);

Review Comment:
   ```suggestion
           this.topicExtractor = createTopicExtractor(pluginConfig.getString(TOPIC), seaTunnelRowType);
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -54,10 +56,13 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     private final SinkWriter.Context context;
     private final Config pluginConfig;
     private final Function<SeaTunnelRow, String> partitionExtractor;
+    private final Function<SeaTunnelRow, String> topicExtractor;
 
     private String transactionPrefix;
     private long lastCheckpointId = 0;
     private int partition;
+    private String topic;
+    private boolean isExtractTopic = false;

Review Comment:
   remove



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }

Review Comment:
   ```suggestion
       private Function<SeaTunnelRow, String> createTopicExtractor(String topicConfig, SeaTunnelRowType seaTunnelRowType) {
           String regex = "\\$\\{(.*?)\\}";
           Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
           Matcher matcher = pattern.matcher(topicConfig);
           if (!matcher.find()) {
               return row -> topicConfig;
           }
           String topicField = matcher.group(1);
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }
+        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+        if (!fieldNames.contains(topicField)) {
+            throw new IllegalArgumentException("Field name is not found!");

Review Comment:
   ```suggestion
               throw new IllegalArgumentException("Field name{" +  topicField + "} is not found!");
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }
+        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+        if (!fieldNames.contains(topicField)) {
+            throw new IllegalArgumentException("Field name is not found!");
+        }
+        int topicFieldIndex = seaTunnelRowType.indexOf(topicField);
+        return row -> {
+            Object topicFieldValue = row.getField(topicFieldIndex);
+            if (topicFieldValue == null) {
+                throw new IllegalArgumentException("The column value is empty!");

Review Comment:
   ```suggestion
                   throw new IllegalArgumentException("The field{" + topicField + "} value is empty!");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] iture123 commented on pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
iture123 commented on PR #3147:
URL: https://github.com/apache/incubator-seatunnel/pull/3147#issuecomment-1310041798

   I suggest support mutile fields,and support combination of fixed string and field names, such as topic = "topic_${fieldName1}_${fieldName2}",reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex closed pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex closed pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields
URL: https://github.com/apache/incubator-seatunnel/pull/3147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TaoZex commented on pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

Posted by GitBox <gi...@apache.org>.
TaoZex commented on PR #3147:
URL: https://github.com/apache/incubator-seatunnel/pull/3147#issuecomment-1310068354

   > I suggest support mutile fields,and support combination of fixed string and field names, such as topic = "topic_${fieldName1}_${fieldName2}",reference https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/index/IndexSerializerFactory.java
   
   Thanks for your advice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org