You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/07/12 11:24:22 UTC
[incubator-seatunnel] branch dev updated: fix KafkaTableStream source json parse issue (#2168)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dde91db51 fix KafkaTableStream source json parse issue (#2168)
dde91db51 is described below
commit dde91db51a0ae9ab80fccad6e8d62feaf232b077
Author: Mustard <mu...@foxmail.com>
AuthorDate: Tue Jul 12 19:24:15 2022 +0800
fix KafkaTableStream source json parse issue (#2168)
---
.../java/org/apache/seatunnel/flink/util/SchemaUtil.java | 13 ++++++++-----
.../seatunnel/flink/kafka/source/KafkaTableStream.java | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index c980e308e..80040d188 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -69,7 +69,7 @@ public final class SchemaUtil {
getJsonSchema(schema, (ObjectNode) info);
break;
case CSV:
- getCsvSchema(schema, (List<Map<String, String>>) info);
+ getCsvSchema(schema, (ArrayNode) info);
break;
case ORC:
getOrcSchema(schema, (ObjectNode) info);
@@ -149,11 +149,14 @@ public final class SchemaUtil {
}
}
- private static void getCsvSchema(Schema schema, List<Map<String, String>> schemaList) {
+ private static void getCsvSchema(Schema schema, ArrayNode schemaList) {
+ Iterator<JsonNode> iterator = schemaList.elements();
+
+ while (iterator.hasNext()) {
+ JsonNode jsonNode = iterator.next();
+ String field = jsonNode.get("field").textValue();
+ String type = jsonNode.get("type").textValue().toUpperCase();
- for (Map<String, String> map : schemaList) {
- String field = map.get("field");
- String type = map.get("type").toUpperCase();
schema.field(field, type);
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 503824131..dbb60c7cf 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -111,7 +111,7 @@ public class KafkaTableStream implements FlinkStreamSource {
}
String schemaContent = config.getString(SCHEMA);
format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
- schemaInfo = JsonUtils.parseObject(schemaContent);
+ schemaInfo = JsonUtils.parseArray(schemaContent);
}
@Override