You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/10/16 12:55:53 UTC

[GitHub] [incubator-seatunnel] TyrantLucifer commented on a diff in pull request #2783: [Feature][connectors-v2][kafka] Kafka supports custom schema #2371

TyrantLucifer commented on code in PR #2783:
URL: https://github.com/apache/incubator-seatunnel/pull/2783#discussion_r996438794


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java:
##########
@@ -122,4 +127,33 @@ public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerat
     public void setJobContext(JobContext jobContext) {
         this.jobContext = jobContext;
     }
+
+    private void setDeserialization(Config config) {
+        if (config.hasPath(SCHEMA)) {
+            Config schema = config.getConfig(SCHEMA);
+            typeInfo = SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+            String format = DEFAULT_FORMAT;
+            if (config.hasPath(FORMAT)) {
+                format = config.getString(FORMAT);
+            }
+            if (DEFAULT_FORMAT.equals(format)) {
+                deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
+            } else if ("text".equals(format)) {
+                String delimiter = DEFAULT_FIELD_DELIMITER;
+                if (config.hasPath(FIELD_DELIMITER)) {
+                    delimiter = config.getString(FIELD_DELIMITER);
+                }
+                deserializationSchema = TextDeserializationSchema.builder()
+                        .seaTunnelRowType(typeInfo)
+                        .delimiter(delimiter)
+                        .build();
+            } else {
+                // TODO: use format SPI
+                throw new UnsupportedOperationException("Unsupported format: " + format);
+            }
+        } else {
+            typeInfo = SeaTunnelSchema.buildSimpleTextSchema();
+            this.deserializationSchema = null;

Review Comment:
   ```suggestion
               this.deserializationSchema = TextDeserializationSchema.builder()
                       .seaTunnelRowType(typeInfo)
                       .delimiter(String.valueOf('\002'))
                       .build();
   ```
   If user does not assign schema option, so connector will treat data as the following shown:
   
   |content|
   |---------|
   |xxxxxxxx|
   
   So the upstream data does not need to be delimited, so just pass in an impossible delimiter when initialization the TextDeserializationSchema.
   
   



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java:
##########
@@ -114,9 +114,12 @@ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
                             List<ConsumerRecord<byte[], byte[]>> recordList = records.records(partition);
                             for (ConsumerRecord<byte[], byte[]> record : recordList) {
 
-                                String v = stringDeserializer.deserialize(partition.topic(), record.value());
-                                String t = partition.topic();
-                                output.collect(new SeaTunnelRow(new Object[]{t, v}));
+                                if (deserializationSchema != null) {
+                                    deserializationSchema.deserialize(record.value(), output);
+                                } else {
+                                    String content = stringDeserializer.deserialize(partition.topic(), record.value());
+                                    output.collect(new SeaTunnelRow(new Object[]{content}));
+                                }

Review Comment:
   ```suggestion
                                   deserializationSchema.deserialize(record.value(), output)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org