You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/12 14:19:20 UTC

[GitHub] [incubator-seatunnel] eyys opened a new pull request, #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

eyys opened a new pull request, #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720

   ## Purpose of this pull request
   [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371
   
   ## Check list
   
   * [x] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in your PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/contribution/new-license.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   
   ## 此拉取请求的目的
   Kafka 支持自定义Schema #2371
   
   ## 检查列表
   添加e2e测试
   添加example测试
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Posted by GitBox <gi...@apache.org>.
hailin0 commented on code in PR #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720#discussion_r969092897


##########
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafka_to_console.conf:
##########
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  Kafka {
+    result_table_name = "kafka"
+    schema = {
+      fields {
+        name = "string"
+        age = "int"

Review Comment:
   test all datatypes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on PR #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720#issuecomment-1243840791

   BTW, you should add spark-e2e test cases and update doc of kafka source conenctor too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] eyys closed pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Posted by GitBox <gi...@apache.org>.
eyys closed pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371
URL: https://github.com/apache/incubator-seatunnel/pull/2720


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Posted by GitBox <gi...@apache.org>.
TyrantLucifer commented on code in PR #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720#discussion_r968495747


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -58,15 +59,16 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private final ConsumerMetadata metadata;
     private final Set<KafkaSourceSplit> sourceSplits;
     private final Map<TopicPartition, Long> endOffset;
-    // TODO support user custom type
-    private SeaTunnelRowType typeInfo;
+
+    private final DeserializationSchema<T> deserializationSchema;

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -38,18 +39,21 @@
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Properties;
 
 @AutoService(SeaTunnelSource.class)
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState> {
-
-    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
+public class KafkaSource<T> implements SeaTunnelSource<T, KafkaSourceSplit, KafkaSourceState> {

Review Comment:
   Why change the generic type?



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -101,9 +103,13 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
             for (TopicPartition partition : partitions) {
                 for (ConsumerRecord<byte[], byte[]> record : records.records(partition)) {
 
-                    String v = stringDeserializer.deserialize(partition.topic(), record.value());
-                    String t = partition.topic();
-                    output.collect(new SeaTunnelRow(new Object[]{t, v}));
+                    if(deserializationSchema != null){
+                        deserializationSchema.deserialize(record.value(),output);
+                    }else{
+                        String v = stringDeserializer.deserialize(partition.topic(), record.value());
+                        String t = partition.topic();
+                        output.collect((T) new SeaTunnelRow(new Object[]{t, v}));

Review Comment:
   If user does not assign schema, it will generate a simple schema as the following shown:
   
   ```
   content
   data
   ```
   So the row only have one field named `content`, but you generate a row with two fields.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -87,7 +89,7 @@ public void close() throws IOException {
     }
 
     @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+    public void pollNext(Collector<T> output) throws Exception {

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -62,50 +66,47 @@ public Boundedness getBoundedness() {
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return IDENTIFIER;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVER);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        this.metadata.setTopic(config.getString(TOPIC));
-        if (config.hasPath(PATTERN)) {
-            this.metadata.setPattern(config.getBoolean(PATTERN));
+        this.metadata.setTopic(pluginConfig.getString(TOPIC));
+        if (pluginConfig.hasPath(PATTERN)) {
+            this.metadata.setPattern(pluginConfig.getBoolean(PATTERN));
         }
-        this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+        this.metadata.setBootstrapServer(pluginConfig.getString(BOOTSTRAP_SERVER));
         this.metadata.setProperties(new Properties());
 
-        if (config.hasPath(CONSUMER_GROUP)) {
-            this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+        if (pluginConfig.hasPath(CONSUMER_GROUP)) {
+            this.metadata.setConsumerGroup(pluginConfig.getString(CONSUMER_GROUP));
         } else {
-            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+            this.metadata.setConsumerGroup(KafkaConfig.DEFAULT_CONSUMER_GROUP);
         }
 
-        if (config.hasPath(COMMIT_ON_CHECKPOINT)) {
-            this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
+        if (pluginConfig.hasPath(COMMIT_ON_CHECKPOINT)) {
+            this.metadata.setCommitOnCheckpoint(pluginConfig.getBoolean(COMMIT_ON_CHECKPOINT));
         }
 
-        TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> {
+        TypesafeConfigUtils.extractSubConfig(pluginConfig, "kafka.", false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
         });
 
-        // TODO support user custom row type
-        this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+        setDeserialization(pluginConfig);
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public SeaTunnelDataType<T> getProducedType() {

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -46,7 +47,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSourceSplit> {
+public class KafkaSourceReader<T> implements SourceReader<T, KafkaSourceSplit> {

Review Comment:
   The same as above.



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java:
##########
@@ -0,0 +1,51 @@
+/*

Review Comment:
   seatunnel-examples module is only used by developers. So test cases should not be submitted



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -122,4 +123,15 @@ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerat
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
     }
+
+    private void setDeserialization(Config pluginConfig) {
+        SeaTunnelRowType rowType;
+        if (pluginConfig.hasPath(KafkaConfig.SCHEMA)) {
+            Config schema = pluginConfig.getConfig(KafkaConfig.SCHEMA);
+            rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+        } else {
+            rowType = SeaTunnelSchema.buildSimpleTextSchema();
+        }
+        deserializationSchema = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);

Review Comment:
   `T` should replaced by `SeaTunnelRow`



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -62,50 +66,47 @@ public Boundedness getBoundedness() {
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return IDENTIFIER;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVER);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        this.metadata.setTopic(config.getString(TOPIC));
-        if (config.hasPath(PATTERN)) {
-            this.metadata.setPattern(config.getBoolean(PATTERN));
+        this.metadata.setTopic(pluginConfig.getString(TOPIC));
+        if (pluginConfig.hasPath(PATTERN)) {
+            this.metadata.setPattern(pluginConfig.getBoolean(PATTERN));
         }
-        this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+        this.metadata.setBootstrapServer(pluginConfig.getString(BOOTSTRAP_SERVER));
         this.metadata.setProperties(new Properties());
 
-        if (config.hasPath(CONSUMER_GROUP)) {
-            this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+        if (pluginConfig.hasPath(CONSUMER_GROUP)) {
+            this.metadata.setConsumerGroup(pluginConfig.getString(CONSUMER_GROUP));
         } else {
-            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+            this.metadata.setConsumerGroup(KafkaConfig.DEFAULT_CONSUMER_GROUP);
         }
 
-        if (config.hasPath(COMMIT_ON_CHECKPOINT)) {
-            this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
+        if (pluginConfig.hasPath(COMMIT_ON_CHECKPOINT)) {
+            this.metadata.setCommitOnCheckpoint(pluginConfig.getBoolean(COMMIT_ON_CHECKPOINT));
         }
 
-        TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> {
+        TypesafeConfigUtils.extractSubConfig(pluginConfig, "kafka.", false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
         });
 
-        // TODO support user custom row type
-        this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+        setDeserialization(pluginConfig);
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public SeaTunnelDataType<T> getProducedType() {
+        return deserializationSchema.getProducedType();
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
+    public SourceReader<T, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {

Review Comment:
   The same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-seatunnel] eyys commented on a diff in pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

Posted by GitBox <gi...@apache.org>.
eyys commented on code in PR #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720#discussion_r973700637


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -38,18 +39,21 @@
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Properties;
 
 @AutoService(SeaTunnelSource.class)
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState> {
-
-    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
+public class KafkaSource<T> implements SeaTunnelSource<T, KafkaSourceSplit, KafkaSourceState> {

Review Comment:
   Refer to the PULSAR implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org