You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/20 12:25:53 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3147: [Feature][Connector-V2] Support extract topic from SeaTunnelRow fields

hailin0 commented on code in PR #3147:
URL: https://github.com/apache/incubator-seatunnel/pull/3147#discussion_r1000534502


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -31,6 +31,12 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
 
 Kafka Topic.
 
+Currently two formats are supported:
+
+1. Fill in the name of the topic
+
+2. Use value of a field from upstream data as topic,the format is ${field_topic}, where topic is the name of one of the columns of the upstream data.

Review Comment:
   ```suggestion
   2. Use value of a field from upstream data as topic,the format is ${your field name}, where topic is the value of one of the columns of the upstream data.
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -86,6 +95,8 @@ public KafkaSinkWriter(
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.pluginConfig = pluginConfig;
+        this.topic = extractTopic(pluginConfig.getString(TOPIC));
+        this.topicExtractor = createTopicExtractor(this.topic, seaTunnelRowType);

Review Comment:
   ```suggestion
           this.topicExtractor = createTopicExtractor(pluginConfig.getString(TOPIC), seaTunnelRowType);
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -54,10 +56,13 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     private final SinkWriter.Context context;
     private final Config pluginConfig;
     private final Function<SeaTunnelRow, String> partitionExtractor;
+    private final Function<SeaTunnelRow, String> topicExtractor;
 
     private String transactionPrefix;
     private long lastCheckpointId = 0;
     private int partition;
+    private String topic;
+    private boolean isExtractTopic = false;

Review Comment:
   remove



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }

Review Comment:
   ```suggestion
       private Function<SeaTunnelRow, String> createTopicExtractor(String topicConfig, SeaTunnelRowType seaTunnelRowType) {
           String regex = "\\$\\{(.*?)\\}";
           Pattern pattern = Pattern.compile(regex, Pattern.DOTALL);
           Matcher matcher = pattern.matcher(topicConfig);
           if (!matcher.find()) {
               return row -> topicConfig;
           }
           String topicField = matcher.group(1);
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }
+        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+        if (!fieldNames.contains(topicField)) {
+            throw new IllegalArgumentException("Field name is not found!");

Review Comment:
   ```suggestion
               throw new IllegalArgumentException("Field name{" +  topicField + "} is not found!");
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,6 +211,24 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
+    private Function<SeaTunnelRow, String> createTopicExtractor(String topicField, SeaTunnelRowType seaTunnelRowType) {
+        if (!isExtractTopic) {
+            return row -> null;
+        }
+        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
+        if (!fieldNames.contains(topicField)) {
+            throw new IllegalArgumentException("Field name is not found!");
+        }
+        int topicFieldIndex = seaTunnelRowType.indexOf(topicField);
+        return row -> {
+            Object topicFieldValue = row.getField(topicFieldIndex);
+            if (topicFieldValue == null) {
+                throw new IllegalArgumentException("The column value is empty!");

Review Comment:
   ```suggestion
                   throw new IllegalArgumentException("The field{" + topicField + "} value is empty!");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org