You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/29 14:12:01 UTC

[GitHub] [incubator-seatunnel] harveyyue opened a new pull request, #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

harveyyue opened a new pull request, #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230

   
   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013796496


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   and need to update the examples in the docs
   
   <img width="1427" alt="image" src="https://user-images.githubusercontent.com/14371345/199939588-9530dcd7-ba6b-4c4c-b626-42591f96d255.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1009591698


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -113,3 +113,6 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - Add Kafka Sink Connector
+### next version
+
+- Support to specify multiple partition keys

Review Comment:
   ```suggestion
   - [Feature] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1296043526

   @EricJoy2048 Done.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1302928860

   @TaoZex please help to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013589562


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   It is recommended to implement in this way, which is simpler for users to use
   
   partition_key = "${c_map},${c_string}"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1303850903

   @hailin0 Great suggestions! Actually, you have refactor the sink serialization.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013796496


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   and need to update the examples & description in the docs
   
   <img width="1427" alt="image" src="https://user-images.githubusercontent.com/14371345/199939588-9530dcd7-ba6b-4c4c-b626-42591f96d255.png">
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013591045


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   The value of partition_key can be a normal string or an expression to extract the field content. We have already supported single field extraction before
   
   https://github.com/apache/incubator-seatunnel/pull/3085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013785167


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   > 
   BTW, The fixed string partition key value will cause to write all of data to a partition of topic, specify or omit partition key(s) is good practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] Hisoka-X commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
Hisoka-X commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1308467058

   Pleaes resolve conflict, Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y merged pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
ic4y merged PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1008710151


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +209,39 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
-                                                                    SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY)){
+    private Function<SeaTunnelRow, SeaTunnelRow> createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
+        if (CollectionUtils.isEmpty(this.partitionKeys)) {
             return row -> null;
         }
-        String partitionKey = pluginConfig.getString(PARTITION_KEY);
-        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
-        if (!fieldNames.contains(partitionKey)) {
-            return row -> partitionKey;
-        }
-        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
         return row -> {
-            Object partitionFieldValue = row.getField(partitionFieldIndex);
-            if (partitionFieldValue != null) {
-                return partitionFieldValue.toString();
+            SeaTunnelRow keySeaTunnelRow = new SeaTunnelRow(this.partitionKeys.size());
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                String fieldName = seaTunnelRowType.getFieldNames()[i];
+                if (this.partitionKeys.contains(fieldName)) {
+                    int partitionFieldIndex = seaTunnelRowType.indexOf(fieldName);
+                    Object partitionFieldValue = row.getField(partitionFieldIndex);
+                    keySeaTunnelRow.setField(index, partitionFieldValue);
+                    ++index;
+                }
             }
-            return null;
+            return keySeaTunnelRow;
         };
     }
+
+    private List<String> createPartitionKeys(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        if (pluginConfig.hasPath(PARTITION_KEY)) {
+            return pluginConfig.getStringList(PARTITION_KEY).stream()
+                    .filter(f -> {
+                        if (Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f)) {
+                            return true;
+                        } else {

Review Comment:
   `else` is redundant.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -162,11 +170,24 @@ private Properties getKafkaProperties(Config pluginConfig) {
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.hasPath(PARTITION)){
-            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), this.partition, seaTunnelRowType);
-        }
-        else {
-            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
+        if (pluginConfig.hasPath(PARTITION)) {
+            return new DefaultSeaTunnelRowSerializer(this.topic, this.partition, seaTunnelRowType);
+        } else if (CollectionUtils.isNotEmpty(this.partitionKeys)) {
+            int size = this.partitionKeys.size();
+            String[] keyFieldNames = new String[size];
+            SeaTunnelDataType<?>[] keyFieldTypes = new SeaTunnelDataType[size];
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                if (this.partitionKeys.contains(seaTunnelRowType.getFieldNames()[i])) {
+                    keyFieldNames[index] = seaTunnelRowType.getFieldName(i);
+                    keyFieldTypes[index] = seaTunnelRowType.getFieldType(i);
+                    ++index;
+                }
+            }
+            SeaTunnelRowType keySeaTunnelRowType = new SeaTunnelRowType(keyFieldNames, keyFieldTypes);
+            return new DefaultSeaTunnelRowSerializer(this.topic, keySeaTunnelRowType, seaTunnelRowType);
+        } else {

Review Comment:
   Suggest 
   ```
   if () {
      return xx;
   } else if () {
     return xx;
   } else {
     return xx;
   }
   ```
   
   replace to
   
   ```
   if () {
       return xx;
   }
   
   if () {
     return xx;
   }
   
   return xx;
   ```



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -51,7 +51,7 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
 
-### partition_key [string]
+### partition_key [array]
 
 Configure which field is used as the key of the kafka message.
 

Review Comment:
   Please add `changed logs` reference https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/connector-v2/source/HdfsFile.md



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013780832


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013655984


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   @hailin0 You want to keep the logic of following code snippet, right?
   `
           if (!fieldNames.contains(partitionKey)) {
               return row -> partitionKey;
           }
   `
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013657086


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());

Review Comment:
   Add the consumer data size and key message schema check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013792925


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   Perhaps rename config key to `partition_key_fields` is more appropriate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013919420


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013586275


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   How to use a fixed string partition key value?  eg: partition_key = 'aaa'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1009591698


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -113,3 +113,6 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - Add Kafka Sink Connector
+### next version
+
+- Support to specify multiple partition keys

Review Comment:
   ```suggestion
   - Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1009590076


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -113,3 +113,4 @@ sink {
 ### 2.3.0-beta 2022-10-20
 
 - Add Kafka Sink Connector
+- Support to specify multiple partition keys

Review Comment:
   ```suggestion
   ### next version
   
   - Support to specify multiple partition keys
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1308829280

   Hi, @harveyyue . Resolve the conflicts please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] ic4y commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
ic4y commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1010276676


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());

Review Comment:
    Thank you for your contribution. In addition to checking that the job exits properly, you also need to check that data is successfully written to kafka



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaTextToConsole(TestContainer container) throws IOException, InterruptedException {
+        TextSerializationSchema serializer = TextSerializationSchema.builder()
+                .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                .delimiter(",")
+                .build();
+        generateTestData(row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaJsonToConsole(TestContainer container) throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+        generateTestData(row -> serializer.serializeRow(row));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   Ditto



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaTextToConsole(TestContainer container) throws IOException, InterruptedException {
+        TextSerializationSchema serializer = TextSerializationSchema.builder()
+                .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                .delimiter(",")
+                .build();
+        generateTestData(row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   Ditto,  it is suggest to use AssertSink or fileSink to check whether the output data is as expected



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013791073


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   I agree
   
   I'm worried about breaking the released feature



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1311913796

   Sorry late response, have fixed conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013680834


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaTextToConsole(TestContainer container) throws IOException, InterruptedException {
+        TextSerializationSchema serializer = TextSerializationSchema.builder()
+                .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                .delimiter(",")
+                .build();
+        generateTestData(row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaJsonToConsole(TestContainer container) throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
+        generateTestData(row -> serializer.serializeRow(row));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_json_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   Already use AssertSink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013785167


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf:
##########
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+
+    #spark config
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 1
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 10
+    map.size = 10
+    array.size = 10
+    bytes.length = 10
+    string.length = 10
+    schema = {
+      fields {
+        c_map = "map<string, smallint>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_null = "null"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  Kafka {
+    bootstrap.servers = "kafkaCluster:9092"
+    topic = "test_topic"
+    partition_key = ["c_map","c_string"]

Review Comment:
   BTW, the fixed string partition key value will cause to write all of data to a partition of topic, specify or omit partition key(s) is good practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013934844


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/log4j.properties:
##########
@@ -0,0 +1,22 @@
+#

Review Comment:
   Remove this file.
   
   This is the common configuration of e2e
   https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-e2e/seatunnel-e2e-common/src/test/resources/log4j2.properties



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.kafka;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+@Slf4j
+public class KafkaIT extends TestSuiteBase implements TestResource {
+    private static final int KAFKA_PORT = 9093;
+
+    private static final String KAFKA_HOST = "kafkaCluster";
+
+    private KafkaProducer<byte[], byte[]> producer;
+
+    private KafkaContainer kafkaContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
+                .withNetwork(NETWORK)
+                .withNetworkAliases(KAFKA_HOST)
+                .withLogConsumer(new Slf4jLogConsumer(log));

Review Comment:
   ```suggestion
                   .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +207,32 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
-                                                                    SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY)){
+    private Function<SeaTunnelRow, SeaTunnelRow> createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {

Review Comment:
   suggestion
   
   remove this method



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -25,34 +25,42 @@
 
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {

Review Comment:
   suggestion
   
   ```java
   public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
   
       private Integer partition;
       private final String topic;
       private final SerializationSchema keySerialization;
       private final SerializationSchema valueSerialization;
   
       public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
           this(topic, element -> null, createSerializationSchema(seaTunnelRowType));
       }
   
       public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType) {
           this(topic, seaTunnelRowType);
           this.partition = partition;
       }
   
       public DefaultSeaTunnelRowSerializer(String topic,
                                            List<String> keyFieldNames,
                                            SeaTunnelRowType seaTunnelRowType) {
           this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
               createSerializationSchema(seaTunnelRowType));
       }
   
       public DefaultSeaTunnelRowSerializer(String topic,
                                            SerializationSchema keySerialization,
                                            SerializationSchema valueSerialization) {
           this.topic = topic;
           this.keySerialization = keySerialization;
           this.valueSerialization = valueSerialization;
       }
   
       @Override
       public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
           return new ProducerRecord<>(topic, partition,
               keySerialization.serialize(row), valueSerialization.serialize(row));
       }
   
       private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType) {
           return new JsonSerializationSchema(rowType);
       }
   
       private static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames,
                                                                       SeaTunnelRowType seaTunnelRowType) {
           int[] keyFieldIndexArr = new int[keyFieldNames.size()];
           SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
           for (int i = 0; i < keyFieldNames.size(); i++) {
               String keyFieldName = keyFieldNames.get(i);
               int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
               keyFieldIndexArr[i] = rowFieldIndex;
               keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
           }
           SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
           SerializationSchema keySerializationSchema = createSerializationSchema(keyType);
   
           Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
               Object[] keyFields = new Object[keyFieldIndexArr.length];
               for (int i = 0; i < keyFieldIndexArr.length; i++) {
                   keyFields[i] = row.getField(keyFieldIndexArr[i]);
               }
               return new SeaTunnelRow(keyFields);
           };
           return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
       }
   }
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -188,23 +207,32 @@ private void restoreState(List<KafkaSinkState> states) {
         }
     }
 
-    private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
-                                                                    SeaTunnelRowType seaTunnelRowType) {
-        if (!pluginConfig.hasPath(PARTITION_KEY)){
+    private Function<SeaTunnelRow, SeaTunnelRow> createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
+        if (CollectionUtils.isEmpty(this.partitionKeyFields)) {
             return row -> null;
         }
-        String partitionKey = pluginConfig.getString(PARTITION_KEY);
-        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
-        if (!fieldNames.contains(partitionKey)) {
-            return row -> partitionKey;
-        }
-        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
         return row -> {
-            Object partitionFieldValue = row.getField(partitionFieldIndex);
-            if (partitionFieldValue != null) {
-                return partitionFieldValue.toString();
+            SeaTunnelRow keySeaTunnelRow = new SeaTunnelRow(this.partitionKeyFields.size());
+            int index = 0;
+            for (int i = 0; i < seaTunnelRowType.getFieldNames().length; i++) {
+                String fieldName = seaTunnelRowType.getFieldNames()[i];
+                if (this.partitionKeyFields.contains(fieldName)) {
+                    int partitionFieldIndex = seaTunnelRowType.indexOf(fieldName);
+                    Object partitionFieldValue = row.getField(partitionFieldIndex);
+                    keySeaTunnelRow.setField(index, partitionFieldValue);
+                    ++index;
+                }
             }
-            return null;
+            return keySeaTunnelRow;
         };
     }
+
+    private List<String> createPartitionKeys(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        if (pluginConfig.hasPath(PARTITION_KEY_FIELDS)) {
+            return pluginConfig.getStringList(PARTITION_KEY_FIELDS).stream()
+                    .filter(f -> Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f))
+                    .collect(Collectors.toList());
+        }
+        return null;
+    }

Review Comment:
   suggestion
   
   ```java
       private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
           if (pluginConfig.hasPath(PARTITION_KEY_FIELDS)) {
               List<String> partitionKeyFields = pluginConfig.getStringList(PARTITION_KEY_FIELDS);
               List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
               for (String partitionKeyField : partitionKeyFields) {
                   if (!rowTypeFieldNames.contains(partitionKeyField)) {
                       throw new IllegalArgumentException(String.format(
                           "Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
                   }
               }
               return partitionKeyFields;
           }
           return Collections.emptyList();
       }
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -162,12 +168,25 @@ private Properties getKafkaProperties(Config pluginConfig) {
 
     // todo: parse the target field from config
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {

Review Comment:
   suggestion
   
   ```java
       private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
           if (pluginConfig.hasPath(PARTITION)){
               return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                   pluginConfig.getInt(PARTITION), seaTunnelRowType);
           } else {
               return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                   getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType);
           }
       }
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java:
##########
@@ -34,9 +34,9 @@
     /**
      * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
      *
-     * @param key String
+     * @param key seatunnel row
      * @param row seatunnel row
      * @return kafka record.
      */
-    ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
+    ProducerRecord<K, V> serializeRowByKey(SeaTunnelRow key, SeaTunnelRow row);

Review Comment:
   suggestion
   
   remove this method
   ```suggestion
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -67,13 +72,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {

Review Comment:
   suggestion
   
   ```java
       public void write(SeaTunnelRow element) {
           ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
           kafkaProducerSender.send(producerRecord);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] harveyyue commented on a diff in pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
harveyyue commented on code in PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#discussion_r1013656362


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java:
##########
@@ -84,51 +83,55 @@ public void startKafkaContainer() {
                 .pollInterval(500, TimeUnit.MILLISECONDS)
                 .atMost(180, TimeUnit.SECONDS)
                 .untilAsserted(() -> initKafkaProducer());
-        generateTestData();
     }
 
-    @SuppressWarnings("checkstyle:Indentation")
-    private void generateTestData() {
-
-        SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType(
-                new String[]{
-                        "id",
-                        "c_map",
-                        "c_array",
-                        "c_string",
-                        "c_boolean",
-                        "c_tinyint",
-                        "c_smallint",
-                        "c_int",
-                        "c_bigint",
-                        "c_float",
-                        "c_double",
-                        "c_decimal",
-                        "c_bytes",
-                        "c_date",
-                        "c_timestamp"
-                },
-                new SeaTunnelDataType[]{
-                        BasicType.LONG_TYPE,
-                        new MapType(BasicType.STRING_TYPE, BasicType.SHORT_TYPE),
-                        ArrayType.BYTE_ARRAY_TYPE,
-                        BasicType.STRING_TYPE,
-                        BasicType.BOOLEAN_TYPE,
-                        BasicType.BYTE_TYPE,
-                        BasicType.SHORT_TYPE,
-                        BasicType.INT_TYPE,
-                        BasicType.LONG_TYPE,
-                        BasicType.FLOAT_TYPE,
-                        BasicType.DOUBLE_TYPE,
-                        new DecimalType(2, 1),
-                        PrimitiveByteArrayType.INSTANCE,
-                        LocalTimeType.LOCAL_DATE_TYPE,
-                        LocalTimeType.LOCAL_DATE_TIME_TYPE
-                }
-        );
-
-        DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType);
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (producer != null) {
+            producer.close();
+        }
+        if (kafkaContainer != null) {
+            kafkaContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
+        Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    @TestTemplate
+    public void testSourceKafkaTextToConsole(TestContainer container) throws IOException, InterruptedException {
+        TextSerializationSchema serializer = TextSerializationSchema.builder()
+                .seaTunnelRowType(SEATUNNEL_ROW_TYPE)
+                .delimiter(",")
+                .build();
+        generateTestData(row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)));
+        Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

Review Comment:
   Already use AssertSink



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] EricJoy2048 commented on pull request #3230: [Improve][Connector-V2][Kafka] Support to specify multiple partition keys

Posted by GitBox <gi...@apache.org>.
EricJoy2048 commented on PR #3230:
URL: https://github.com/apache/incubator-seatunnel/pull/3230#issuecomment-1312343772

   Please merge the dev code after https://github.com/apache/incubator-seatunnel/pull/3401 merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org