You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/29 14:38:04 UTC

[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

EricJoy2048 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1008710151


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +209,39 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
-                                                                    SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY)){
+    private Function<SeaTunnelRow, SeaTunnelRow> createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
+        if (CollectionUtils.isEmpty(this.partitionKeys)) {
             return row -> null;
         }
-        String partitionKey = pluginConfig.getString(PARTITION_KEY);
-        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
-        if (!fieldNames.contains(partitionKey)) {
-            return row -> partitionKey;
-        }
-        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
         return row -> {
-            Object partitionFieldValue = row.getField(partitionFieldIndex);
-            if (partitionFieldValue != null) {
-                return partitionFieldValue.toString();
+            SeaTunnelRow keySeaTunnelRow = new SeaTunnelRow(this.partitionKeys.size());
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                String fieldName = seaTunnelRowType.getFieldNames()[i];
+                if (this.partitionKeys.contains(fieldName)) {
+                    int partitionFieldIndex = seaTunnelRowType.indexOf(fieldName);
+                    Object partitionFieldValue = row.getField(partitionFieldIndex);
+                    keySeaTunnelRow.setField(index, partitionFieldValue);
+                    ++index;
+                }
             }
-            return null;
+            return keySeaTunnelRow;
         };
     }
+
+    private List<String> createPartitionKeys(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        if (pluginConfig.hasPath(PARTITION_KEY)) {
+            return pluginConfig.getStringList(PARTITION_KEY).stream()
+                    .filter(f -> {
+                        if (Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f)) {
+                            return true;
+                        } else {

Review Comment:
   `else` is redundant.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -162,11 +170,24 @@ private Properties getKafkaProperties(Config pluginConfig) {
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.hasPath(PARTITION)){
-            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition, seaTunnelRowType);
-        }
-        else {
-            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+        if (pluginConfig.hasPath(PARTITION)) {
+            return new DefaultSeaTunnelRowSerializer(this.topic, this.partition, seaTunnelRowType);
+        } else if (CollectionUtils.isNotEmpty(this.partitionKeys)) {
+            int size = this.partitionKeys.size();
+            String[] keyFieldNames = new String[size];
+            SeaTunnelDataType<?>[] keyFieldTypes = new SeaTunnelDataType[size];
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                if (this.partitionKeys.contains(seaTunnelRowType.getFieldNames()[i])) {
+                    keyFieldNames[index] = seaTunnelRowType.getFieldName(i);
+                    keyFieldTypes[index] = seaTunnelRowType.getFieldType(i);
+                    ++index;
+                }
+            }
+            SeaTunnelRowType keySeaTunnelRowType = new SeaTunnelRowType(keyFieldNames, keyFieldTypes);
+            return new DefaultSeaTunnelRowSerializer(this.topic, keySeaTunnelRowType, seaTunnelRowType);
+        } else {

Review Comment:
   Suggest 
   ```
   if () {
      return xx;
   } else if () {
     return xx;
   } else {
     return xx;
   }
   ```
   
   replace to
   
   ```
   if () {
       return xx;
   }
   
   if () {
     return xx;
   }
   
   return xx;
   ```



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -51,7 +51,7 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
 
-### partition_key [string]
+### partition_key [array]
 
 Configure which field is used as the key of the kafka message.
 

Review Comment:
   Please add `changed logs` reference https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/HdfsFile.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org