You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/09/12 14:36:28 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2720: [Feature][seatunnel-connectors-v2][connector-kafka] Kafka supports custom schema #2371

TyrantLucifer commented on code in PR #2720:
URL: https://github.com/apache/incubator-seatunnel/pull/2720#discussion_r968495747


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -58,15 +59,16 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private final ConsumerMetadata metadata;
     private final Set<KafkaSourceSplit> sourceSplits;
     private final Map<TopicPartition, Long> endOffset;
-    // TODO support user custom type
-    private SeaTunnelRowType typeInfo;
+
+    private final DeserializationSchema<T> deserializationSchema;

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -38,18 +39,21 @@
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaConfig;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
 
 import java.util.Properties;
 
 @AutoService(SeaTunnelSource.class)
-public class KafkaSource implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState> {
-
-    private static final String DEFAULT_CONSUMER_GROUP = "SeaTunnel-Consumer-Group";
+public class KafkaSource<T> implements SeaTunnelSource<T, KafkaSourceSplit, KafkaSourceState> {

Review Comment:
   Why change the generic type?



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -101,9 +103,13 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
             for (TopicPartition partition : partitions) {
                 for (ConsumerRecord<byte[], byte[]> record : records.records(partition)) {
 
-                    String v = stringDeserializer.deserialize(partition.topic(), record.value());
-                    String t = partition.topic();
-                    output.collect(new SeaTunnelRow(new Object[]{t, v}));
+                    if(deserializationSchema != null){
+                        deserializationSchema.deserialize(record.value(),output);
+                    }else{
+                        String v = stringDeserializer.deserialize(partition.topic(), record.value());
+                        String t = partition.topic();
+                        output.collect((T) new SeaTunnelRow(new Object[]{t, v}));

Review Comment:
   If user does not assign schema, it will generate a simple schema as the following shown:
   
   ```
   content
   data
   ```
   So the row only have one field named `content`, but you generate a row with two fields.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -87,7 +89,7 @@ public void close() throws IOException {
     }
 
     @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+    public void pollNext(Collector<T> output) throws Exception {

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -62,50 +66,47 @@ public Boundedness getBoundedness() {
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return IDENTIFIER;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVER);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        this.metadata.setTopic(config.getString(TOPIC));
-        if (config.hasPath(PATTERN)) {
-            this.metadata.setPattern(config.getBoolean(PATTERN));
+        this.metadata.setTopic(pluginConfig.getString(TOPIC));
+        if (pluginConfig.hasPath(PATTERN)) {
+            this.metadata.setPattern(pluginConfig.getBoolean(PATTERN));
         }
-        this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+        this.metadata.setBootstrapServer(pluginConfig.getString(BOOTSTRAP_SERVER));
         this.metadata.setProperties(new Properties());
 
-        if (config.hasPath(CONSUMER_GROUP)) {
-            this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+        if (pluginConfig.hasPath(CONSUMER_GROUP)) {
+            this.metadata.setConsumerGroup(pluginConfig.getString(CONSUMER_GROUP));
         } else {
-            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+            this.metadata.setConsumerGroup(KafkaConfig.DEFAULT_CONSUMER_GROUP);
         }
 
-        if (config.hasPath(COMMIT_ON_CHECKPOINT)) {
-            this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
+        if (pluginConfig.hasPath(COMMIT_ON_CHECKPOINT)) {
+            this.metadata.setCommitOnCheckpoint(pluginConfig.getBoolean(COMMIT_ON_CHECKPOINT));
         }
 
-        TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> {
+        TypesafeConfigUtils.extractSubConfig(pluginConfig, "kafka.", false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
         });
 
-        // TODO support user custom row type
-        this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+        setDeserialization(pluginConfig);
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public SeaTunnelDataType<T> getProducedType() {

Review Comment:
   The same as above.



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -46,7 +47,7 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
-public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSourceSplit> {
+public class KafkaSourceReader<T> implements SourceReader<T, KafkaSourceSplit> {

Review Comment:
   The same as above.



##########
seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/java/org/apache/seatunnel/example/flink/v2/KafkaToConsoleExample.java:
##########
@@ -0,0 +1,51 @@
+/*

Review Comment:
   seatunnel-examples module is only used by developers. So test cases should not be submitted



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -122,4 +123,15 @@ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerat
     public void setSeaTunnelContext(SeaTunnelContext seaTunnelContext) {
         this.seaTunnelContext = seaTunnelContext;
     }
+
+    private void setDeserialization(Config pluginConfig) {
+        SeaTunnelRowType rowType;
+        if (pluginConfig.hasPath(KafkaConfig.SCHEMA)) {
+            Config schema = pluginConfig.getConfig(KafkaConfig.SCHEMA);
+            rowType = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+        } else {
+            rowType = SeaTunnelSchema.buildSimpleTextSchema();
+        }
+        deserializationSchema = (DeserializationSchema<T>) new JsonDeserializationSchema(false, false, rowType);

Review Comment:
   `T` should replaced by `SeaTunnelRow`



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -62,50 +66,47 @@ public Boundedness getBoundedness() {
 
     @Override
     public String getPluginName() {
-        return "Kafka";
+        return IDENTIFIER;
     }
 
     @Override
-    public void prepare(Config config) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(config, TOPIC, BOOTSTRAP_SERVER);
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, TOPIC, BOOTSTRAP_SERVER);
         if (!result.isSuccess()) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
-        this.metadata.setTopic(config.getString(TOPIC));
-        if (config.hasPath(PATTERN)) {
-            this.metadata.setPattern(config.getBoolean(PATTERN));
+        this.metadata.setTopic(pluginConfig.getString(TOPIC));
+        if (pluginConfig.hasPath(PATTERN)) {
+            this.metadata.setPattern(pluginConfig.getBoolean(PATTERN));
         }
-        this.metadata.setBootstrapServer(config.getString(BOOTSTRAP_SERVER));
+        this.metadata.setBootstrapServer(pluginConfig.getString(BOOTSTRAP_SERVER));
         this.metadata.setProperties(new Properties());
 
-        if (config.hasPath(CONSUMER_GROUP)) {
-            this.metadata.setConsumerGroup(config.getString(CONSUMER_GROUP));
+        if (pluginConfig.hasPath(CONSUMER_GROUP)) {
+            this.metadata.setConsumerGroup(pluginConfig.getString(CONSUMER_GROUP));
         } else {
-            this.metadata.setConsumerGroup(DEFAULT_CONSUMER_GROUP);
+            this.metadata.setConsumerGroup(KafkaConfig.DEFAULT_CONSUMER_GROUP);
         }
 
-        if (config.hasPath(COMMIT_ON_CHECKPOINT)) {
-            this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT));
+        if (pluginConfig.hasPath(COMMIT_ON_CHECKPOINT)) {
+            this.metadata.setCommitOnCheckpoint(pluginConfig.getBoolean(COMMIT_ON_CHECKPOINT));
         }
 
-        TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> {
+        TypesafeConfigUtils.extractSubConfig(pluginConfig, "kafka.", false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped()));
         });
 
-        // TODO support user custom row type
-        this.typeInfo = new SeaTunnelRowType(new String[]{"topic", "raw_message"},
-                new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.STRING_TYPE});
-
+        setDeserialization(pluginConfig);
     }
 
     @Override
-    public SeaTunnelRowType getProducedType() {
-        return this.typeInfo;
+    public SeaTunnelDataType<T> getProducedType() {
+        return deserializationSchema.getProducedType();
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {
-        return new KafkaSourceReader(this.metadata, this.typeInfo, readerContext);
+    public SourceReader<T, KafkaSourceSplit> createReader(SourceReader.Context readerContext) throws Exception {

Review Comment:
   The same as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org