You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/12 11:24:22 UTC

[incubator-seatunnel] branch dev updated: fix KafkaTableStream source json parse issue (#2168)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dde91db51 fix KafkaTableStream source json parse issue (#2168)
dde91db51 is described below

commit dde91db51a0ae9ab80fccad6e8d62feaf232b077
Author: Mustard <mu...@foxmail.com>
AuthorDate: Tue Jul 12 19:24:15 2022 +0800

    fix KafkaTableStream source json parse issue (#2168)
---
 .../java/org/apache/seatunnel/flink/util/SchemaUtil.java    | 13 ++++++++-----
 .../seatunnel/flink/kafka/source/KafkaTableStream.java      |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index c980e308e..80040d188 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -69,7 +69,7 @@ public final class SchemaUtil {
                 getJsonSchema(schema, (ObjectNode) info);
                 break;
             case CSV:
-                getCsvSchema(schema, (List<Map<String, String>>) info);
+                getCsvSchema(schema, (ArrayNode) info);
                 break;
             case ORC:
                 getOrcSchema(schema, (ObjectNode) info);
@@ -149,11 +149,14 @@ public final class SchemaUtil {
         }
     }
 
-    private static void getCsvSchema(Schema schema, List<Map<String, String>> schemaList) {
+    private static void getCsvSchema(Schema schema, ArrayNode schemaList) {
+        Iterator<JsonNode> iterator = schemaList.elements();
+
+        while (iterator.hasNext()) {
+            JsonNode jsonNode = iterator.next();
+            String field = jsonNode.get("field").textValue();
+            String type = jsonNode.get("type").textValue().toUpperCase();
 
-        for (Map<String, String> map : schemaList) {
-            String field = map.get("field");
-            String type = map.get("type").toUpperCase();
             schema.field(field, type);
         }
     }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 503824131..dbb60c7cf 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -111,7 +111,7 @@ public class KafkaTableStream implements FlinkStreamSource {
         }
         String schemaContent = config.getString(SCHEMA);
         format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
-        schemaInfo = JsonUtils.parseObject(schemaContent);
+        schemaInfo = JsonUtils.parseArray(schemaContent);
     }
 
     @Override