You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by "hailin0 (via GitHub)" <gi...@apache.org> on 2023/05/06 01:53:29 UTC

[GitHub] [incubator-seatunnel] hailin0 commented on a diff in pull request #4651: [Improve][KafkaSource] Move Kafka Source Create By Source Factory

hailin0 commented on code in PR #4651:
URL: https://github.com/apache/incubator-seatunnel/pull/4651#discussion_r1179927274


##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java:
##########
@@ -170,6 +170,22 @@ public static List<CatalogTable> getCatalogTables(
         return catalogTables;
     }
 
+    public static CatalogTableUtil buildWithReadonlyConfig(ReadonlyConfig option) {
+        if (option.get(FIELDS) == null) {
+            throw new RuntimeException(
+                    "Schema config need option [schema], please correct your config first");
+        }
+        TableSchema tableSchema = parseTableSchema(option.get(FIELDS));
+        return new CatalogTableUtil(
+                CatalogTable.of(
+                        // TODO: other table info
+                        TableIdentifier.of("", "", ""),

Review Comment:
   Is there no need to define an table identifier?



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java:
##########
@@ -84,25 +86,18 @@ public class Config {
                                     + "Kafka distinguishes different transactions by different transactionId. "
                                     + "This parameter is prefix of kafka transactionId, make sure different job use different prefix.");
 
-    public static final Option<Config> SCHEMA =
-            Options.key("schema")
-                    .objectType(Config.class)
-                    .noDefaultValue()
-                    .withDescription(
-                            "The structure of the data, including field names and field types.");
-
     public static final Option<MessageFormat> FORMAT =
             Options.key("format")
                     .enumType(MessageFormat.class)
-                    .defaultValue(MessageFormat.JSON)
+                    .defaultValue(MessageFormat.TEXT)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org