You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/06/08 00:38:43 UTC
[incubator-seatunnel] branch api-draft updated: [Improve]Use Jackson replace Fastjson (#1971) (#1985)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 0bfd6259 [Improve]Use Jackson replace Fastjson (#1971) (#1985)
0bfd6259 is described below
commit 0bfd6259338720246a47c179409fbca0880f0159
Author: Kirs <ki...@apache.org>
AuthorDate: Wed Jun 8 08:38:39 2022 +0800
[Improve]Use Jackson replace Fastjson (#1971) (#1985)
* [Improve]Use Jackson replace Fastjson (#1971)
* remove fastjson
* fix lombok scope
---
pom.xml | 10 +-
seatunnel-apis/seatunnel-api-flink/pom.xml | 6 +
.../apache/seatunnel/flink/util/SchemaUtil.java | 50 ++--
seatunnel-apis/seatunnel-api-spark/pom.xml | 5 +
.../seatunnel/spark/utils/SparkStructTypeUtil.java | 16 +-
seatunnel-common/pom.xml | 28 +-
.../apache/seatunnel/common/utils/JsonUtils.java | 282 +++++++++++++++++++++
.../seatunnel-connector-flink-clickhouse/pom.xml | 5 +
.../clickhouse/sink/client/ClickhouseClient.java | 6 +-
.../seatunnel-connector-flink-druid/pom.xml | 5 +
.../flink/druid/sink/DruidOutputFormat.java | 12 +-
.../seatunnel-connector-flink-file/pom.xml | 5 +
.../flink/file/sink/JsonRowOutputFormat.java | 27 +-
.../seatunnel/flink/file/source/FileSource.java | 12 +-
.../flink/file/source/JsonRowInputFormat.java | 22 +-
.../flink/kafka/source/KafkaTableStream.java | 16 +-
.../serialize/DefaultSeaTunnelRowSerializer.java | 4 +-
.../seatunnel-connector-spark-feishu/pom.xml | 5 -
.../seatunnel/spark/feishu/FeishuClient.scala | 59 +++--
.../seatunnel-connector-spark-mongodb/pom.xml | 5 +
.../seatunnel/spark/mongodb/source/MongoDB.scala | 5 +-
seatunnel-dist/release-docs/LICENSE | 1 -
tools/dependencies/known-dependencies.txt | 1 -
23 files changed, 451 insertions(+), 136 deletions(-)
diff --git a/pom.xml b/pom.xml
index 7880a2ae..dc5298ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,6 @@
<hudi.version>0.10.0</hudi.version>
<hadoop.binary.version>2.7</hadoop.binary.version>
<hadoop.version>2.7.5</hadoop.version>
- <fastjson.version>1.2.80</fastjson.version>
<jackson.version>2.12.6</jackson.version>
<lombok.version>1.18.0</lombok.version>
<mysql.version>8.0.16</mysql.version>
@@ -418,16 +417,12 @@
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- <version>${fastjson.version}</version>
- </dependency>
+
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -627,7 +622,6 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/seatunnel-apis/seatunnel-api-flink/pom.xml b/seatunnel-apis/seatunnel-api-flink/pom.xml
index 739b52ec..3afdc7cc 100644
--- a/seatunnel-apis/seatunnel-api-flink/pom.xml
+++ b/seatunnel-apis/seatunnel-api-flink/pom.xml
@@ -37,6 +37,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
index 0c301513..8a4b79dd 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/SchemaUtil.java
@@ -17,13 +17,14 @@
package org.apache.seatunnel.flink.util;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -58,19 +59,19 @@ public final class SchemaUtil {
switch (format) {
case JSON:
- getJsonSchema(schema, (JSONObject) info);
+ getJsonSchema(schema, (ObjectNode) info);
break;
case CSV:
getCsvSchema(schema, (List<Map<String, String>>) info);
break;
case ORC:
- getOrcSchema(schema, (JSONObject) info);
+ getOrcSchema(schema, (ObjectNode) info);
break;
case AVRO:
- getAvroSchema(schema, (JSONObject) info);
+ getAvroSchema(schema, (ObjectNode) info);
break;
case PARQUET:
- getParquetSchema(schema, (JSONObject) info);
+ getParquetSchema(schema, (ObjectNode) info);
break;
default:
}
@@ -110,9 +111,9 @@ public final class SchemaUtil {
return formatDescriptor;
}
- private static void getJsonSchema(Schema schema, JSONObject json) {
-
- for (Map.Entry<String, Object> entry : json.entrySet()) {
+ private static void getJsonSchema(Schema schema, ObjectNode json) {
+ Map<String, Object> jsonMap = JsonUtils.toMap(json);
+ for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
@@ -123,12 +124,12 @@ public final class SchemaUtil {
schema.field(key, Types.LONG());
} else if (value instanceof BigDecimal) {
schema.field(key, Types.JAVA_BIG_DEC());
- } else if (value instanceof JSONObject) {
- schema.field(key, getTypeInformation((JSONObject) value));
- } else if (value instanceof JSONArray) {
- Object obj = ((JSONArray) value).get(0);
- if (obj instanceof JSONObject) {
- schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((JSONObject) obj)));
+ } else if (value instanceof ObjectNode) {
+ schema.field(key, getTypeInformation((ObjectNode) value));
+ } else if (value instanceof ArrayNode) {
+ Object obj = ((ArrayNode) value).get(0);
+ if (obj instanceof ObjectNode) {
+ schema.field(key, ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation((ObjectNode) obj)));
} else {
schema.field(key, ObjectArrayTypeInfo.getInfoFor(Object[].class, TypeInformation.of(Object.class)));
}
@@ -161,7 +162,7 @@ public final class SchemaUtil {
* @param schema schema
* @param json json
*/
- private static void getOrcSchema(Schema schema, JSONObject json) {
+ private static void getOrcSchema(Schema schema, ObjectNode json) {
}
@@ -171,11 +172,11 @@ public final class SchemaUtil {
* @param schema schema
* @param json json
*/
- private static void getParquetSchema(Schema schema, JSONObject json) {
+ private static void getParquetSchema(Schema schema, ObjectNode json) {
}
- private static void getAvroSchema(Schema schema, JSONObject json) {
+ private static void getAvroSchema(Schema schema, ObjectNode json) {
RowTypeInfo typeInfo = (RowTypeInfo) AvroSchemaConverter.<Row>convertToTypeInfo(json.toString());
String[] fieldNames = typeInfo.getFieldNames();
for (String name : fieldNames) {
@@ -183,12 +184,13 @@ public final class SchemaUtil {
}
}
- public static RowTypeInfo getTypeInformation(JSONObject json) {
+ public static RowTypeInfo getTypeInformation(ObjectNode json) {
int size = json.size();
String[] fields = new String[size];
TypeInformation<?>[] informations = new TypeInformation[size];
+ Map<String, Object> jsonMap = JsonUtils.toMap(json);
int i = 0;
- for (Map.Entry<String, Object> entry : json.entrySet()) {
+ for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
fields[i] = key;
@@ -200,10 +202,10 @@ public final class SchemaUtil {
informations[i] = Types.LONG();
} else if (value instanceof BigDecimal) {
informations[i] = Types.JAVA_BIG_DEC();
- } else if (value instanceof JSONObject) {
- informations[i] = getTypeInformation((JSONObject) value);
- } else if (value instanceof JSONArray) {
- JSONObject demo = ((JSONArray) value).getJSONObject(0);
+ } else if (value instanceof ObjectNode) {
+ informations[i] = getTypeInformation((ObjectNode) value);
+ } else if (value instanceof ArrayNode) {
+ ObjectNode demo = (ObjectNode) ((ArrayNode) value).get(0);
informations[i] = ObjectArrayTypeInfo.getInfoFor(Row[].class, getTypeInformation(demo));
}
i++;
diff --git a/seatunnel-apis/seatunnel-api-spark/pom.xml b/seatunnel-apis/seatunnel-api-spark/pom.xml
index 29242201..8f4e7ca0 100644
--- a/seatunnel-apis/seatunnel-api-spark/pom.xml
+++ b/seatunnel-apis/seatunnel-api-spark/pom.xml
@@ -36,6 +36,11 @@
<artifactId>seatunnel-api-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
index e3b0542c..27833f6d 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/utils/SparkStructTypeUtil.java
@@ -18,8 +18,9 @@
package org.apache.seatunnel.spark.utils;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.utils.JsonUtils;
-import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
@@ -32,13 +33,14 @@ public final class SparkStructTypeUtil {
private SparkStructTypeUtil() {
}
- public static StructType getStructType(StructType schema, JSONObject json) {
+ public static StructType getStructType(StructType schema, ObjectNode json) {
StructType newSchema = schema.copy(schema.fields());
- for (Map.Entry<String, Object> entry : json.entrySet()) {
+ Map<String, Object> jsonMap = JsonUtils.toMap(json);
+ for (Map.Entry<String, Object> entry : jsonMap.entrySet()) {
String field = entry.getKey();
Object type = entry.getValue();
- if (type instanceof JSONObject) {
- StructType st = getStructType(new StructType(), (JSONObject) type);
+ if (type instanceof ObjectNode) {
+ StructType st = getStructType(new StructType(), (ObjectNode) type);
newSchema = newSchema.add(field, st);
} else if (type instanceof List) {
List list = (List) type;
@@ -47,8 +49,8 @@ public final class SparkStructTypeUtil {
newSchema = newSchema.add(field, DataTypes.createArrayType(null, true));
} else {
Object o = list.get(0);
- if (o instanceof JSONObject) {
- StructType st = getStructType(new StructType(), (JSONObject) o);
+ if (o instanceof ObjectNode) {
+ StructType st = getStructType(new StructType(), (ObjectNode) o);
newSchema = newSchema.add(field, DataTypes.createArrayType(st, true));
} else {
DataType st = getType(o.toString());
diff --git a/seatunnel-common/pom.xml b/seatunnel-common/pom.xml
index 24e7dd32..bf801a04 100644
--- a/seatunnel-common/pom.xml
+++ b/seatunnel-common/pom.xml
@@ -34,34 +34,26 @@
<artifactId>seatunnel-config-shade</artifactId>
</dependency>
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
</dependency>
</dependencies>
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
new file mode 100644
index 00000000..5f362059
--- /dev/null
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import static com.fasterxml.jackson.databind.DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT;
+import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
+import static com.fasterxml.jackson.databind.DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL;
+import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_GETTERS;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+public class JsonUtils {
+
+ /**
+ * can use static singleton, inject: just make sure to reuse!
+ */
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
+ .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
+ .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
+ .configure(REQUIRE_SETTERS_FOR_GETTERS, true)
+ .setTimeZone(TimeZone.getDefault());
+
+ private JsonUtils() {
+ throw new UnsupportedOperationException("Construct JSONUtils");
+ }
+
+ public static ArrayNode createArrayNode() {
+ return OBJECT_MAPPER.createArrayNode();
+ }
+
+ public static ObjectNode createObjectNode() {
+ return OBJECT_MAPPER.createObjectNode();
+ }
+
+ public static JsonNode toJsonNode(Object obj) {
+ return OBJECT_MAPPER.valueToTree(obj);
+ }
+
+ /**
+ * json representation of object
+ *
+ * @param object object
+ * @param feature feature
+ * @return object to json string
+ */
+ public static String toJsonString(Object object, SerializationFeature feature) {
+ try {
+ ObjectWriter writer = OBJECT_MAPPER.writer(feature);
+ return writer.writeValueAsString(object);
+ } catch (Exception e) {
+ throw new RuntimeException("Object to json exception!", e);
+ }
+ }
+
+ /**
+ * This method deserializes the specified Json into an object of the specified class. It is not
+ * suitable to use if the specified class is a generic type since it will not have the generic
+ * type information because of the Type Erasure feature of Java. Therefore, this method should not
+ * be used if the desired type is a generic type. Note that this method works fine if the any of
+ * the fields of the specified object are generics, just the object itself should not be a
+ * generic type.
+ *
+ * @param json the string from which the object is to be deserialized
+ * @param clazz the class of T
+ * @param <T> T
+ * @return an object of type T from the string
+ * classOfT
+ */
+ public static <T> T parseObject(String json, Class<T> clazz) {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(json, clazz);
+ } catch (Exception e) {
+ throw new RuntimeException("Json parse object exception!", e);
+ }
+ }
+
+ /**
+ * json to list
+ *
+ * @param json json string
+ * @param clazz class
+ * @param <T> T
+ * @return list
+ */
+ public static <T> List<T> toList(String json, Class<T> clazz) {
+ if (StringUtils.isEmpty(json)) {
+ return Collections.emptyList();
+ }
+
+ try {
+ CollectionType listType = OBJECT_MAPPER.getTypeFactory().constructCollectionType(ArrayList.class, clazz);
+ return OBJECT_MAPPER.readValue(json, listType);
+ } catch (Exception e) {
+ throw new RuntimeException("Json parse list exception!", e);
+ }
+ }
+
+ /**
+ * Method for finding a JSON Object field with specified name in this
+ * node or its child nodes, and returning value it has.
+ * If no matching field is found in this node or its descendants, returns null.
+ *
+ * @param jsonNode json node
+ * @param fieldName Name of field to look for
+ * @return Value of first matching node found, if any; null if none
+ */
+ public static String findValue(JsonNode jsonNode, String fieldName) {
+ JsonNode node = jsonNode.findValue(fieldName);
+
+ if (node == null) {
+ return null;
+ }
+
+ return node.asText();
+ }
+
+ /**
+ * json to map
+ * {@link #toMap(String, Class, Class)}
+ *
+ * @param json json
+ * @return json to map
+ */
+ public static Map<String, String> toMap(String json) {
+ return parseObject(json, new TypeReference<Map<String, String>>() {
+ });
+ }
+
+ public static Map<String, Object> toMap(JsonNode jsonNode) {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.convertValue(jsonNode, new TypeReference<Map<String, Object>>() {
+ });
+ }
+
+ /**
+ * json to map
+ *
+ * @param json json
+ * @param classK classK
+ * @param classV classV
+ * @param <K> K
+ * @param <V> V
+ * @return to map
+ */
+ public static <K, V> Map<K, V> toMap(String json, Class<K> classK, Class<V> classV) {
+ if (StringUtils.isEmpty(json)) {
+ return Collections.emptyMap();
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(json, new TypeReference<Map<K, V>>() {
+ });
+ } catch (Exception e) {
+ throw new RuntimeException("json to map exception!", e);
+ }
+ }
+
+ /**
+ * json to object
+ *
+ * @param json json string
+ * @param type type reference
+ * @param <T> type
+ * @return return parse object
+ */
+ public static <T> T parseObject(String json, TypeReference<T> type) {
+ if (StringUtils.isEmpty(json)) {
+ return null;
+ }
+
+ try {
+ return OBJECT_MAPPER.readValue(json, type);
+ } catch (Exception e) {
+ throw new RuntimeException("Json parse object exception.", e);
+ }
+ }
+
+ /**
+ * object to json string
+ *
+ * @param object object
+ * @return json string
+ */
+ public static String toJsonString(Object object) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(object);
+ } catch (Exception e) {
+ throw new RuntimeException("Object json deserialization exception.", e);
+ }
+ }
+
+ public static ObjectNode parseObject(String text) {
+ try {
+ if (text.isEmpty()) {
+ return parseObject(text, ObjectNode.class);
+ } else {
+ return (ObjectNode) OBJECT_MAPPER.readTree(text);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("String json deserialization exception.", e);
+ }
+ }
+
+ public static ArrayNode parseArray(String text) {
+ try {
+ return (ArrayNode) OBJECT_MAPPER.readTree(text);
+ } catch (Exception e) {
+ throw new RuntimeException("Json deserialization exception.", e);
+ }
+ }
+
+ /**
+ * json serializer
+ */
+ public static class JsonDataSerializer extends JsonSerializer<String> {
+
+ @Override
+ public void serialize(String value, JsonGenerator gen, SerializerProvider provider) throws IOException {
+ gen.writeRawValue(value);
+ }
+
+ }
+
+ /**
+ * json data deserializer
+ */
+ public static class JsonDataDeserializer extends JsonDeserializer<String> {
+
+ @Override
+ public String deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
+ JsonNode node = p.getCodec().readTree(p);
+ if (node instanceof TextNode) {
+ return node.asText();
+ } else {
+ return node.toString();
+ }
+ }
+
+ }
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
index bf198afd..1268b7e3 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/pom.xml
@@ -36,6 +36,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
index 2e6d121d..86e8f76d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/client/ClickhouseClient.java
@@ -24,14 +24,14 @@ import static org.apache.seatunnel.flink.clickhouse.ConfigKey.PASSWORD;
import static org.apache.seatunnel.flink.clickhouse.ConfigKey.USERNAME;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.flink.clickhouse.pojo.DistributedEngine;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
import org.apache.seatunnel.flink.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.TypeReference;
+import com.fasterxml.jackson.core.type.TypeReference;
import ru.yandex.clickhouse.BalancedClickhouseDataSource;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;
@@ -191,7 +191,7 @@ public class ClickhouseClient {
String engine = resultSet.getString(1);
String createTableDDL = resultSet.getString(2);
String engineFull = resultSet.getString(3);
- List<String> dataPaths = JSON.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
+ List<String> dataPaths = JsonUtils.parseObject(resultSet.getString(4).replaceAll("'", "\""), new TypeReference<List<String>>() {
});
DistributedEngine distributedEngine = null;
if ("Distributed".equals(engine)) {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
index 4eb30c94..7814f375 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/pom.xml
@@ -36,6 +36,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
index 1adee168..dca0c26c 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
@@ -20,12 +20,13 @@ package org.apache.seatunnel.flink.druid.sink;
import static org.apache.flink.api.java.io.CsvInputFormat.DEFAULT_FIELD_DELIMITER;
import static org.apache.flink.api.java.io.CsvInputFormat.DEFAULT_LINE_DELIMITER;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -122,14 +123,15 @@ public class DruidOutputFormat extends RichOutputFormat<Row> {
mapper.configure(SerializationFeature.INDENT_OUTPUT, false);
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
String taskJSON = mapper.writeValueAsString(indexTask);
- JSONObject jsonObject = JSON.parseObject(taskJSON);
+ ObjectNode jsonObject = JsonUtils.parseObject(taskJSON);
jsonObject.remove("id");
jsonObject.remove("groupId");
jsonObject.remove("resource");
- JSONObject spec = jsonObject.getJSONObject("spec");
+
+ ObjectNode spec = (ObjectNode) jsonObject.get("spec");
spec.remove("tuningConfig");
jsonObject.put("spec", spec);
- taskJSON = jsonObject.toJSONString();
+ taskJSON = jsonObject.toString();
URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
index 3d99c82d..31bdf4f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/pom.xml
@@ -36,6 +36,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
index bc2fc169..efcbceae 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/JsonRowOutputFormat.java
@@ -17,8 +17,12 @@
package org.apache.seatunnel.flink.file.sink;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -31,7 +35,6 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;
-import java.util.Arrays;
public class JsonRowOutputFormat extends FileOutputFormat<Row> {
@@ -86,37 +89,39 @@ public class JsonRowOutputFormat extends FileOutputFormat<Row> {
@Override
public void writeRecord(Row record) throws IOException {
- final JSONObject json = getJson(record, rowTypeInfo);
+ final ObjectNode json = getJson(record, rowTypeInfo);
byte[] bytes = json.toString().getBytes(charset);
this.stream.write(bytes);
this.stream.write(NEWLINE);
}
- private JSONObject getJson(Row record, RowTypeInfo rowTypeInfo) {
+ private ObjectNode getJson(Row record, RowTypeInfo rowTypeInfo) {
String[] fieldNames = rowTypeInfo.getFieldNames();
int i = 0;
- JSONObject json = new JSONObject();
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode json = mapper.createObjectNode();
for (String name : fieldNames) {
Object field = record.getField(i);
+ JsonNode fieldNode = JsonUtils.toJsonNode(field);
final TypeInformation type = rowTypeInfo.getTypeAt(i);
if (type instanceof AtomicType) {
- json.put(name, field);
+ json.set(name, fieldNode);
} else if (type instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo arrayTypeInfo = (ObjectArrayTypeInfo) type;
TypeInformation componentInfo = arrayTypeInfo.getComponentInfo();
- JSONArray jsonArray = new JSONArray();
+ ArrayNode jsonArray = mapper.createArrayNode();
if (componentInfo instanceof RowTypeInfo) {
final Row[] rows = (Row[]) field;
for (Row r : rows) {
jsonArray.add(getJson(r, (RowTypeInfo) componentInfo));
}
} else {
- jsonArray.addAll(Arrays.asList((Object[]) field));
+ jsonArray.add(fieldNode);
}
- json.put(name, jsonArray);
+ json.set(name, jsonArray);
} else if (type instanceof RowTypeInfo) {
RowTypeInfo typeInfo = (RowTypeInfo) type;
- json.put(name, getJson((Row) field, typeInfo));
+ json.set(name, getJson((Row) field, typeInfo));
}
i++;
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
index 02a88440..32a208a3 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/FileSource.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.flink.file.source;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchSource;
import org.apache.seatunnel.flink.enums.FormatType;
@@ -26,8 +28,9 @@ import org.apache.seatunnel.flink.util.SchemaUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.service.AutoService;
import org.apache.avro.Schema;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -45,6 +48,7 @@ import org.apache.parquet.schema.MessageType;
import java.util.List;
import java.util.Map;
+@AutoService(BaseFlinkSource.class)
public class FileSource implements FlinkBatchSource {
private static final long serialVersionUID = -5206798549756998426L;
@@ -91,7 +95,7 @@ public class FileSource implements FlinkBatchSource {
Path filePath = new Path(path);
switch (format) {
case JSON:
- JSONObject jsonSchemaInfo = JSONObject.parseObject(config.getString(SCHEMA));
+ ObjectNode jsonSchemaInfo = JsonUtils.parseObject(config.getString(SCHEMA));
RowTypeInfo jsonInfo = SchemaUtil.getTypeInformation(jsonSchemaInfo);
inputFormat = new JsonRowInputFormat(filePath, null, jsonInfo);
break;
@@ -104,7 +108,7 @@ public class FileSource implements FlinkBatchSource {
this.inputFormat = new OrcRowInputFormat(path, config.getString(SCHEMA), null, DEFAULT_BATCH_SIZE);
break;
case CSV:
- List<Map<String, String>> csvSchemaInfo = JSONObject.parseObject(config.getString(SCHEMA),
+ List<Map<String, String>> csvSchemaInfo = JsonUtils.parseObject(config.getString(SCHEMA),
new TypeReference<List<Map<String, String>>>() {
});
TypeInformation<?>[] csvType = SchemaUtil.getCsvType(csvSchemaInfo);
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
index 1fcf26e1..bac38358 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/source/JsonRowInputFormat.java
@@ -17,8 +17,10 @@
package org.apache.seatunnel.flink.file.source;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -57,7 +59,7 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
}
String str = new String(bytes, offset, numBytes, this.charsetName);
- JSONObject json = JSONObject.parseObject(str);
+ ObjectNode json = JsonUtils.parseObject(str);
Row reuseRow;
if (reuse == null) {
reuseRow = new Row(rowTypeInfo.getArity());
@@ -68,26 +70,26 @@ public class JsonRowInputFormat extends DelimitedInputFormat<Row> implements Res
return reuseRow;
}
- private void setJsonRow(Row row, JSONObject json, RowTypeInfo rowTypeInfo) {
+ private void setJsonRow(Row row, ObjectNode json, RowTypeInfo rowTypeInfo) {
String[] fieldNames = rowTypeInfo.getFieldNames();
int i = 0;
for (String name : fieldNames) {
Object value = json.get(name);
- if (value instanceof JSONObject) {
+ if (value instanceof ObjectNode) {
TypeInformation information = rowTypeInfo.getTypeAt(name);
Row r = new Row(information.getArity());
- setJsonRow(r, (JSONObject) value, (RowTypeInfo) information);
+ setJsonRow(r, (ObjectNode) value, (RowTypeInfo) information);
row.setField(i++, r);
- } else if (value instanceof JSONArray) {
+ } else if (value instanceof ArrayNode) {
ObjectArrayTypeInfo information = (ObjectArrayTypeInfo) rowTypeInfo.getTypeAt(name);
- JSONArray array = (JSONArray) value;
+ ArrayNode array = (ArrayNode) value;
Object[] objects = new Object[array.size()];
int j = 0;
for (Object o : array) {
- if (o instanceof JSONObject) {
+ if (o instanceof ObjectNode) {
TypeInformation componentInfo = information.getComponentInfo();
Row r = new Row(componentInfo.getArity());
- setJsonRow(r, (JSONObject) o, (RowTypeInfo) componentInfo);
+ setJsonRow(r, (ObjectNode) o, (RowTypeInfo) componentInfo);
objects[j++] = r;
} else {
objects[j++] = o;
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index 1aacf290..fa5c3d99 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.PropertiesUtil;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.flink.BaseFlinkSource;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.enums.FormatType;
import org.apache.seatunnel.flink.stream.FlinkStreamSource;
@@ -29,8 +31,8 @@ import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.parser.Feature;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.auto.service.AutoService;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.Table;
@@ -43,9 +45,10 @@ import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+@AutoService(BaseFlinkSource.class)
public class KafkaTableStream implements FlinkStreamSource {
private static final long serialVersionUID = 5287018194573371428L;
@@ -108,7 +111,7 @@ public class KafkaTableStream implements FlinkStreamSource {
}
String schemaContent = config.getString(SCHEMA);
format = FormatType.from(config.getString(SOURCE_FORMAT).trim().toLowerCase());
- schemaInfo = JSONObject.parse(schemaContent, Feature.OrderedField);
+ schemaInfo = JsonUtils.parseObject(schemaContent);
}
@Override
@@ -155,9 +158,10 @@ public class KafkaTableStream implements FlinkStreamSource {
kafka.startFromEarliest();
break;
case "specific":
+ //todo Is json format?
String offset = config.getString("offset.reset.specific");
- HashMap<Integer, Long> map = new HashMap<>(DEFAULT_INITIAL_CAPACITY);
- JSONObject.parseObject(offset).forEach((k, v) -> map.put(Integer.valueOf(k), Long.valueOf(v.toString())));
+ Map<Integer, Long> map = JsonUtils.parseObject(offset, new TypeReference<Map<Integer, Long>>() {
+ });
kafka.startFromSpecificOffsets(map);
break;
default:
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 84dfac44..2e7e4e10 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.utils.JsonUtils;
-import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.HashMap;
@@ -45,6 +45,6 @@ public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<Str
for (int i = 0; i < fieldNames.length; i++) {
map.put(fieldNames[i], fields[i]);
}
- return new ProducerRecord<>(topic, null, JSON.toJSONString(map));
+ return new ProducerRecord<>(topic, null, JsonUtils.toJsonString(map));
}
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
index fbe4353c..039d8da2 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/pom.xml
@@ -51,10 +51,5 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>fastjson</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
index 9033d8a0..27063b45 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-feishu/src/main/scala/org/apache/seatunnel/spark/feishu/FeishuClient.scala
@@ -16,17 +16,19 @@
*/
package org.apache.seatunnel.spark.feishu
-import scala.collection.mutable.ArrayBuffer
-
-import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
-import org.apache.http.{HttpEntity, HttpHeaders}
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode}
import org.apache.http.client.methods.{CloseableHttpResponse, RequestBuilder}
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
+import org.apache.http.{HttpEntity, HttpHeaders}
+import org.apache.seatunnel.common.utils.JsonUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.slf4j.{Logger, LoggerFactory}
+import scala.collection.mutable.ArrayBuffer
+
class FeishuClient(appId: String, appSecret: String) {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
@@ -34,7 +36,7 @@ class FeishuClient(appId: String, appSecret: String) {
val url = Config.TOKEN_URL.format(appId, appSecret)
val result = this.requestFeishuApi(url, null)
logger.info(s"Request token and get result $result")
- result.getString(Config.TENANT_ACCESS_TOKEN)
+ result.get(Config.TENANT_ACCESS_TOKEN).toString
}
def getSheetId(sheetToken: String, sheetNum: Int): String = {
@@ -45,40 +47,41 @@ class FeishuClient(appId: String, appSecret: String) {
"Did not get any sheet in Feishu, please make sure there is correct sheet token!")
}
- val sheets = data.getJSONArray(Config.SHEETS)
+ val sheets = data.get(Config.SHEETS)
if (null == sheets || sheets.size() < sheetNum) {
throw new RuntimeException(s"The sheet $sheetNum is does not exists")
}
- val sheetInfo = sheets.getJSONObject(sheetNum - 1)
- sheetInfo.getString(Config.SHEET_ID)
+ val sheetInfo = sheets.get(sheetNum - 1)
+ sheetInfo.get(Config.SHEET_ID).toString
}
- def getSheetData(sheetToken: String, range: String, sheetId: String): JSONArray = {
+ def getSheetData(sheetToken: String, range: String, sheetId: String): ArrayNode = {
var rangeNew = range
// the range format is xxx!A1:C3
if (!"".equals(rangeNew)) {
rangeNew = "!" + rangeNew
}
val url = Config.SHEET_DATA_URL.format(sheetToken, sheetId, rangeNew)
- val data = this.requestFeishuApiAndGetData(url)
+ val data = this.requestFeishuApiAndGetData(url).asInstanceOf[ArrayNode]
if (null == data) {
throw new RuntimeException("The data is empty, please make sure some data in sheet.")
}
- val valueRange = data.getJSONObject(Config.VALUE_RANGE)
+ val valueRange = data.get(Config.VALUE_RANGE)
if (null == valueRange) {
throw new RuntimeException("The data is empty, please make sure some data in sheet.")
}
- valueRange.getJSONArray(Config.VALUES)
+ valueRange.get(Config.VALUES).asInstanceOf[ArrayNode]
}
+ // todo: add UT
def getDataset(
- sheetToken: String,
- range: String,
- titleLineNum: Int,
- ignoreTitleLine: Boolean,
- sheetNum: Int): (ArrayBuffer[Row], StructType) = {
+ sheetToken: String,
+ range: String,
+ titleLineNum: Int,
+ ignoreTitleLine: Boolean,
+ sheetNum: Int): (ArrayBuffer[Row], StructType) = {
val sheetId = this.getSheetId(sheetToken, sheetNum)
val values = getSheetData(sheetToken, range, sheetId)
if (values.size() < titleLineNum) {
@@ -92,24 +95,24 @@ class FeishuClient(appId: String, appSecret: String) {
}
var schema: StructType = null
- val schemaData = values.getJSONArray(titleLineNum - 1)
+ val schemaData = values.get(titleLineNum - 1).asInstanceOf[ArrayNode]
val fields = ArrayBuffer[StructField]()
for (index <- 0 until schemaData.size()) {
- val titleName = schemaData.getString(index)
+ val titleName = schemaData.get(index)
if (null == titleName) {
throw new RuntimeException("The title name is not allowed null")
}
- val field = DataTypes.createStructField(titleName, DataTypes.StringType, true)
+ val field = DataTypes.createStructField(titleName.toString, DataTypes.StringType, true)
fields += field
}
schema = DataTypes.createStructType(fields.toArray)
val rows = ArrayBuffer[Row]()
for (index <- start until values.size()) {
- val jsonArr = values.getJSONArray(index)
+ val jsonArr = values.get(index).asInstanceOf[ArrayNode]
val arr = ArrayBuffer[String]()
for (indexInner <- 0 until jsonArr.size()) {
- arr += jsonArr.getString(indexInner)
+ arr += jsonArr.get(indexInner).toString
}
val row = Row.fromSeq(arr)
@@ -118,12 +121,12 @@ class FeishuClient(appId: String, appSecret: String) {
(rows, schema)
}
- def requestFeishuApiAndGetData(url: String): JSONObject = {
+ def requestFeishuApiAndGetData(url: String): JsonNode = {
val result = this.requestFeishuApi(url, this.getToken)
- result.getJSONObject(Config.DATA)
+ result.get(Config.DATA)
}
- def requestFeishuApi(url: String, token: String): JSONObject = {
+ def requestFeishuApi(url: String, token: String): ObjectNode = {
val httpGet = RequestBuilder.get()
.setUri(url)
.setHeader(HttpHeaders.AUTHORIZATION, s"Bearer $token")
@@ -148,10 +151,10 @@ class FeishuClient(appId: String, appSecret: String) {
}
}
- val result = JSON.parseObject(resultStr)
- val code = result.getIntValue(Config.CODE)
+ val result = JsonUtils.parseObject(resultStr)
+ val code = result.get(Config.CODE).asInt()
if (code != 0) {
- val errorMessage = result.getString(Config.MSG)
+ val errorMessage = result.get(Config.MSG).toString
throw new RuntimeException(
s"Request feishu api error, the code is: $code and msg is: $errorMessage")
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
index 75430b85..da7c82ad 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/pom.xml
@@ -32,6 +32,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
index e565d414..61b2d939 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-mongodb/src/main/scala/org/apache/seatunnel/spark/mongodb/source/MongoDB.scala
@@ -17,11 +17,10 @@
package org.apache.seatunnel.spark.mongodb.source
import scala.collection.JavaConversions._
-
-import com.alibaba.fastjson.JSON
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import org.apache.seatunnel.common.config.{CheckConfigUtil, CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.utils.JsonUtils
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSource
import org.apache.seatunnel.spark.utils.SparkStructTypeUtil
@@ -49,7 +48,7 @@ class MongoDB extends SparkBatchSource {
})
if (config.hasPath("schema")) {
- val schemaJson = JSON.parseObject(config.getString("schema"))
+ val schemaJson = JsonUtils.parseObject(config.getString("schema"))
schema = SparkStructTypeUtil.getStructType(schema, schemaJson)
}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index c95ba410..f8a69980 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -267,7 +267,6 @@ The text of each license is the standard Apache 2.0 license.
(Apache 2) chill-java (com.twitter:chill-java:0.7.6 - https://github.com/twitter/chill)
(Apache 2) chill-java (com.twitter:chill-java:0.8.4 - https://github.com/twitter/chill)
(Apache 2) chill-java (com.twitter:chill-java:0.9.3 - https://github.com/twitter/chill)
- (Apache 2) fastjson (com.alibaba:fastjson:1.2.80 - https://github.com/alibaba/fastjson)
(Apache 2) opencsv (com.opencsv:opencsv:4.6 - http://opencsv.sf.net)
(Apache 2) opencsv (net.sf.opencsv:opencsv:2.3 - http://opencsv.sf.net)
(Apache 2) org.roaringbitmap:RoaringBitmap (org.roaringbitmap:RoaringBitmap:0.9.0 - https://github.com/RoaringBitmap/RoaringBitmap)
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index dc1d88ff..956f1df1 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -161,7 +161,6 @@ error_prone_annotations-2.3.4.jar
error_prone_annotations-2.8.0.jar
esri-geometry-api-2.2.0.jar
extendedset-0.22.1.jar
-fastjson-1.2.80.jar
fastutil-6.5.6.jar
fastutil-7.0.13.jar
fastutil-8.5.4.jar