You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/21 11:35:08 UTC
[incubator-inlong] branch master updated: [INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d89fd1f31 [INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)
d89fd1f31 is described below
commit d89fd1f31062d3113a14e1e940494b5a9d45b170
Author: pacino <ge...@gmail.com>
AuthorDate: Thu Apr 21 19:35:01 2022 +0800
[INLONG-3860][Sort] Add json(avro/debezium-json/canal-json) format for ExtractNode and LoadNode (#3865)
---
.../sort/protocol/node/format/AvroFormat.java | 76 +++++++++++++
.../sort/protocol/node/format/CanalJsonFormat.java | 102 +++++++++++++++++
.../protocol/node/format/DebeziumJsonFormat.java | 109 ++++++++++++++++++
.../inlong/sort/protocol/node/format/Format.java | 51 +++++++++
.../sort/protocol/node/format/JsonFormat.java | 122 +++++++++++++++++++++
.../sort/protocol/node/load/KafkaLoadNode.java | 39 ++++++-
.../apache/inlong/sort/protocol/GroupInfoTest.java | 23 ++--
.../inlong/sort/protocol/StreamInfoTest.java | 22 ++--
.../sort/protocol/node/KafkaLoadNodeTest.java | 10 +-
.../sort/protocol/node/format/AvroFormatTest.java | 32 ++++++
.../protocol/node/format/CanalJsonFormatTest.java | 32 ++++++
.../node/format/DebeziumJsonFormatTest.java | 32 ++++++
.../sort/protocol/node/format/FormatBaseTest.java | 55 ++++++++++
.../sort/protocol/node/format/JsonFormatTest.java | 32 ++++++
14 files changed, 713 insertions(+), 24 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java
new file mode 100644
index 000000000..3a21df8cd
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/AvroFormat.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Avro format.
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/avro/">Avro
+ * Format</a>
+ */
+@JsonTypeName("avroFormat")
+@Data
+public class AvroFormat implements Format {
+
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty(value = "codec")
+ private String codec;
+
+ @JsonCreator
+ public AvroFormat(@JsonProperty(value = "codec") String codec) {
+ this.codec = codec;
+ }
+
+ @JsonCreator
+ public AvroFormat() {
+ }
+
+ /**
+ * Return avro
+ *
+ * @return format
+ */
+ @JsonIgnore
+ @Override
+ public String getFormat() {
+ return "avro";
+ }
+
+ /**
+ * generate options for connector
+ *
+ * @return options
+ */
+ @Override
+ public Map<String, String> generateOptions() {
+ Map<String, String> options = new HashMap<>(4);
+ options.put("key.format", getFormat());
+ options.put("value.format", getFormat());
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java
new file mode 100644
index 000000000..717ca5b05
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormat.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Canal format.
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/">
+ * Canal Format</a>
+ */
+@JsonTypeName("canalJsonFormat")
+@Data
+public class CanalJsonFormat implements Format {
+
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+ private Boolean ignoreParseErrors;
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+ private String timestampFormatStandard;
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+ private String mapNullKeyMode;
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+ private String mapNullKeyLiteral;
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ private Boolean encodeDecimalAsPlainNumber;
+
+ @JsonCreator
+ public CanalJsonFormat(@JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ Boolean encodeDecimalAsPlainNumber) {
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormatStandard = timestampFormatStandard;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ }
+
+ @JsonCreator
+ public CanalJsonFormat() {
+ this(true, "SQL", "DROP", "null", true);
+ }
+
+ /**
+ * Return canal-json
+ *
+ * @return format
+ */
+ @JsonIgnore
+ @Override
+ public String getFormat() {
+ return "canal-json";
+ }
+
+ /**
+ * Generate options for connector
+ *
+ * @return options
+ */
+ @Override
+ public Map<String, String> generateOptions() {
+ Map<String, String> options = new HashMap<>(16);
+ options.put("format", getFormat());
+ if (this.ignoreParseErrors != null) {
+ options.put("canal-json.ignore-parse-errors", this.ignoreParseErrors.toString());
+ }
+ options.put("canal-json.timestamp-format.standard", this.timestampFormatStandard);
+ options.put("canal-json.map-null-key.mode", this.mapNullKeyMode);
+ options.put("canal-json.map-null-key.literal", this.mapNullKeyLiteral);
+ if (this.encodeDecimalAsPlainNumber != null) {
+ options.put("canal-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
+ }
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java
new file mode 100644
index 000000000..445ff93c5
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormat.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Debezium format
+ *
+ * @see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/debezium">
+ * Debezium Format</a>
+ */
+@JsonTypeName("debeziumJsonFormat")
+@Data
+public class DebeziumJsonFormat implements Format {
+
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty(value = "schemaInclude", defaultValue = "false")
+ private Boolean schemaInclude;
+ @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+ private Boolean ignoreParseErrors;
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+ private String timestampFormatStandard;
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+ private String mapNullKeyMode;
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+ private String mapNullKeyLiteral;
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ private Boolean encodeDecimalAsPlainNumber;
+
+ @JsonCreator
+ public DebeziumJsonFormat(@JsonProperty(value = "schemaInclude", defaultValue = "false") Boolean schemaInclude,
+ @JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ Boolean encodeDecimalAsPlainNumber) {
+ this.schemaInclude = schemaInclude;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormatStandard = timestampFormatStandard;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ }
+
+ @JsonCreator
+ public DebeziumJsonFormat() {
+ this(false, true, "SQL", "DROP", "null", true);
+ }
+
+ /**
+ * Return debezium-json
+ *
+ * @return format
+ */
+ @JsonIgnore
+ @Override
+ public String getFormat() {
+ return "debezium-json";
+ }
+
+ /**
+ * Generate options for connector
+ *
+ * @return options
+ */
+ @Override
+ public Map<String, String> generateOptions() {
+ Map<String, String> options = new HashMap<>(16);
+ options.put("format", getFormat());
+ if (this.schemaInclude != null) {
+ options.put("debezium-json.schema-include", this.schemaInclude.toString());
+ }
+ if (this.ignoreParseErrors != null) {
+ options.put("debezium-json.ignore-parse-errors", this.ignoreParseErrors.toString());
+ }
+ options.put("debezium-json.timestamp-format.standard", this.timestampFormatStandard);
+ options.put("debezium-json.map-null-key.mode", this.mapNullKeyMode);
+ options.put("debezium-json.map-null-key.literal", this.mapNullKeyLiteral);
+ if (this.encodeDecimalAsPlainNumber != null) {
+ options.put("debezium-json.encode.decimal-as-plain-number", this.encodeDecimalAsPlainNumber.toString());
+ }
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
new file mode 100644
index 000000000..5d3c2efd1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/Format.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@JsonTypeInfo(
+ use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = JsonFormat.class, name = "jsonFormat"),
+ @JsonSubTypes.Type(value = AvroFormat.class, name = "avroFormat"),
+ @JsonSubTypes.Type(value = DebeziumJsonFormat.class, name = "debeziumJsonFormat"),
+ @JsonSubTypes.Type(value = CanalJsonFormat.class, name = "canalJsonFormat")
+})
+public interface Format extends Serializable {
+
+ /**
+ * return format for example json/avro/debezium-json/canal-json
+ *
+ * @return format
+ */
+ String getFormat();
+
+ /**
+ * generate options for connector
+ *
+ * @return options
+ */
+ Map<String, String> generateOptions();
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java
new file mode 100644
index 000000000..3a183cce9
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/format/JsonFormat.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Json format
+ *
+ * @see <a herf="https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/json/">
+ * Json Format</a>
+ */
+
+@JsonTypeName("jsonFormat")
+@Data
+@ToString
+public class JsonFormat implements Format {
+
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty(value = "failOnMissingField", defaultValue = "false")
+ private Boolean failOnMissingField;
+ @JsonProperty(value = "ignoreParseErrors", defaultValue = "true")
+ private Boolean ignoreParseErrors;
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL")
+ private String timestampFormatStandard;
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP")
+ private String mapNullKeyMode;
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null")
+ private String mapNullKeyLiteral;
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ private Boolean encodeDecimalAsPlainNumber;
+
+ @JsonCreator
+ public JsonFormat(@JsonProperty(value = "failOnMissingField", defaultValue = "false") Boolean failOnMissingField,
+ @JsonProperty(value = "ignoreParseErrors", defaultValue = "true") Boolean ignoreParseErrors,
+ @JsonProperty(value = "timestampFormatStandard", defaultValue = "SQL") String timestampFormatStandard,
+ @JsonProperty(value = "mapNullKeyMode", defaultValue = "DROP") String mapNullKeyMode,
+ @JsonProperty(value = "mapNullKeyLiteral", defaultValue = "null") String mapNullKeyLiteral,
+ @JsonProperty(value = "encodeDecimalAsPlainNumber", defaultValue = "true")
+ Boolean encodeDecimalAsPlainNumber) {
+ this.failOnMissingField = failOnMissingField;
+ this.ignoreParseErrors = ignoreParseErrors;
+ this.timestampFormatStandard = timestampFormatStandard;
+ this.mapNullKeyMode = mapNullKeyMode;
+ this.mapNullKeyLiteral = mapNullKeyLiteral;
+ this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
+ }
+
+ @JsonCreator
+ public JsonFormat() {
+ this(false, true, "SQL", "DROP", "null", true);
+ }
+
+ /**
+ * Return json
+ *
+ * @return format
+ */
+ @JsonIgnore
+ @Override
+ public String getFormat() {
+ return "json";
+ }
+
+ /**
+ * Generate options for connector
+ *
+ * @return options
+ */
+ @Override
+ public Map<String, String> generateOptions() {
+ Map<String, String> options = new HashMap<>(32);
+ options.put("key.format", getFormat());
+ options.put("value.format", getFormat());
+ if (this.failOnMissingField != null) {
+ String failOnMissingField = this.failOnMissingField.toString();
+ options.put("value.json.fail-on-missing-field", failOnMissingField);
+ options.put("key.json.fail-on-missing-field", failOnMissingField);
+ }
+ if (this.ignoreParseErrors != null) {
+ String ignoreParseErrors = this.ignoreParseErrors.toString();
+ options.put("value.json.ignore-parse-errors", ignoreParseErrors);
+ options.put("key.json.ignore-parse-errors", ignoreParseErrors);
+ }
+ options.put("value.json.timestamp-format.standard", this.timestampFormatStandard);
+ options.put("value.json.map-null-key.mode", this.mapNullKeyMode);
+ options.put("value.json.map-null-key.literal", this.mapNullKeyLiteral);
+ if (this.encodeDecimalAsPlainNumber != null) {
+ String encodeDecimalAsPlainNumber = this.encodeDecimalAsPlainNumber.toString();
+ options.put("value.json.encode.decimal-as-plain-number", encodeDecimalAsPlainNumber);
+ options.put("key.json.encode.decimal-as-plain-number", encodeDecimalAsPlainNumber);
+ }
+ options.put("key.json.timestamp-format.standard", this.timestampFormatStandard);
+ options.put("key.json.map-null-key.mode", this.mapNullKeyMode);
+ options.put("key.json.map-null-key.literal", this.mapNullKeyLiteral);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
index 500f82c31..3483ee68a 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/KafkaLoadNode.java
@@ -21,10 +21,16 @@ import com.google.common.base.Preconditions;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.format.AvroFormat;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
import org.apache.inlong.sort.protocol.transformation.FilterFunction;
@@ -51,7 +57,10 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
private String bootstrapServers;
@Nonnull
@JsonProperty("format")
- private String format;
+ private Format format;
+
+ @JsonProperty("primaryKey")
+ private String primaryKey;
public KafkaLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
@@ -60,13 +69,20 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
@JsonProperty("filters") List<FilterFunction> filters,
@Nonnull @JsonProperty("topic") String topic,
@Nonnull @JsonProperty("bootstrapServers") String bootstrapServers,
- @Nonnull @JsonProperty("format") String format,
+ @Nonnull @JsonProperty("format") Format format,
@Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
- @JsonProperty("properties") Map<String, String> properties) {
+ @JsonProperty("properties") Map<String, String> properties,
+ @JsonProperty("primaryKey") String primaryKey) {
super(id, name, fields, fieldRelationShips, filters, sinkParallelism, properties);
this.topic = Preconditions.checkNotNull(topic, "topic is null");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrapServers is null");
this.format = Preconditions.checkNotNull(format, "format is null");
+ if (format instanceof JsonFormat && StringUtils.isEmpty(primaryKey)) {
+ throw new IllegalArgumentException("primaryKey is empty when format is json");
+ } else if (format instanceof AvroFormat && StringUtils.isEmpty(primaryKey)) {
+ throw new IllegalArgumentException("primaryKey is empty when format is avro");
+ }
+ this.primaryKey = primaryKey;
}
@Override
@@ -74,16 +90,29 @@ public class KafkaLoadNode extends LoadNode implements Serializable {
return "node_" + super.getId() + "_" + topic;
}
+ /**
+ * Generate options for kafka connector
+ *
+ * @return options
+ */
@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
- options.put("connector", "kafka");
options.put("topic", topic);
options.put("properties.bootstrap.servers", bootstrapServers);
- options.put("format", format);
if (getSinkParallelism() != null) {
options.put("sink.parallelism", getSinkParallelism().toString());
}
+ if (format instanceof JsonFormat || format instanceof AvroFormat) {
+ options.put("connector", "upsert-kafka");
+ } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
+ options.put("connector", "kafka");
+ } else {
+ throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
+ }
+ if (format.generateOptions() != null && !format.generateOptions().isEmpty()) {
+ options.putAll(format.generateOptions());
+ }
return options;
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
index 1a6805209..a14f0f515 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/GroupInfoTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -39,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+
import static org.junit.Assert.assertEquals;
/**
@@ -78,8 +80,8 @@ public class GroupInfoTest {
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
- "topic", "localhost:9092", "json",
- 1, null);
+ "topic", "localhost:9092", new JsonFormat(),
+ 1, null, "id");
}
private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -135,9 +137,12 @@ public class GroupInfoTest {
+ "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
- + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
- + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
- + "\"outputs\":[\"2\"]}]}]}";
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
+ + "\"format\":{\"type\":\"jsonFormat\",\"failOnMissingField\":false,\"ignoreParseErrors\":true,"
+ + "\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\","
+ + "\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+ + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\",\""
+ + "inputs\":[\"1\"],\"outputs\":[\"2\"]}]}]}";
assertEquals(expected, objectMapper.writeValueAsString(groupInfo));
}
@@ -185,9 +190,11 @@ public class GroupInfoTest {
+ "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
- + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
- + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
- + "\"outputs\":[\"2\"]}]}]}";
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+ + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+ + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+ + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+ + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}]}";
GroupInfo expected = objectMapper.readValue(groupInfoStr, GroupInfo.class);
assertEquals(expected, groupInfo);
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
index 4c65eda6e..5c0f630c6 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/StreamInfoTest.java
@@ -26,6 +26,7 @@ import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.node.Node;
import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.ConstantParam;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -39,6 +40,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+
import static org.junit.Assert.assertEquals;
/**
@@ -78,8 +80,8 @@ public class StreamInfoTest {
new FieldInfo("ts", new TimestampFormatInfo()))
);
return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
- "topic", "localhost:9092", "json",
- 1, null);
+ "topic", "localhost:9092", new JsonFormat(),
+ 1, null, "id");
}
private NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node> outputs) {
@@ -130,9 +132,11 @@ public class StreamInfoTest {
+ "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
- + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
- + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
- + "\"outputs\":[\"2\"]}]}";
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+ + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+ + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+ + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+ + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}";
assertEquals(expected, objectMapper.writeValueAsString(streamInfo));
}
@@ -178,9 +182,11 @@ public class StreamInfoTest {
+ "\"inputField\":{\"type\":\"base\",\"name\":\"ts\",\"formatInfo\":{\"type\":\"timestamp\","
+ "\"format\":\"yyyy-MM-dd HH:mm:ss\"}},\"outputField\":{\"type\":\"base\",\"name\":\"ts\","
+ "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}}}],"
- + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":\"json\","
- + "\"sinkParallelism\":1}],\"relations\":[{\"type\":\"baseRelation\",\"inputs\":[\"1\"],"
- + "\"outputs\":[\"2\"]}]}";
+ + "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\",\"format\":{\"type\":\"jsonFormat\","
+ + "\"failOnMissingField\":false,\"ignoreParseErrors\":true,\"timestampFormatStandard\":\"SQL\","
+ + "\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\",\"encodeDecimalAsPlainNumber\":true},"
+ + "\"sinkParallelism\":1,\"primaryKey\":\"id\"}],\"relations\":[{\"type\":\"baseRelation\","
+ + "\"inputs\":[\"1\"],\"outputs\":[\"2\"]}]}";
StreamInfo expected = objectMapper.readValue(streamInfoStr, StreamInfo.class);
assertEquals(expected, streamInfo);
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
index b11956474..4064d177b 100644
--- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/KafkaLoadNodeTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.node;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -33,8 +34,8 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
Arrays.asList(new FieldInfo("field", new StringFormatInfo())),
Arrays.asList(new FieldRelationShip(new FieldInfo("field", new StringFormatInfo()),
new FieldInfo("field", new StringFormatInfo()))), null,
- "topic", "localhost:9092", "json",
- 1, new TreeMap<>());
+ "topic", "localhost:9092", new CanalJsonFormat(),
+ 1, new TreeMap<>(), null);
}
@Override
@@ -44,6 +45,9 @@ public class KafkaLoadNodeTest extends NodeBaseTest {
+ "\"inputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}},"
+ "\"outputField\":{\"type\":\"base\",\"name\":\"field\",\"formatInfo\":{\"type\":\"string\"}}}],"
+ "\"topic\":\"topic\",\"bootstrapServers\":\"localhost:9092\","
- + "\"format\":\"json\",\"sinkParallelism\":1,\"properties\":{}}";
+ + "\"format\":{\"type\":\"canalJsonFormat\",\"ignoreParseErrors\":true,"
+ + "\"timestampFormatStandard\":\"SQL\",\"mapNullKeyMode\":\"DROP\",\"mapNullKeyLiteral\":\"null\","
+ + "\"encodeDecimalAsPlainNumber\":true},\"sinkParallelism\":1,"
+ + "\"properties\":{}}";
}
}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java
new file mode 100644
index 000000000..6d5de8df1
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/AvroFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class AvroFormatTest extends FormatBaseTest {
+
+ /**
+ * Test Serialize and Deserialize of avro
+ *
+ * @return format
+ * @see Format
+ */
+ @Override
+ public Format getFormat() {
+ return new AvroFormat();
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java
new file mode 100644
index 000000000..bdaf845cf
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/CanalJsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class CanalJsonFormatTest extends FormatBaseTest {
+
+ /**
+ * Test Serialize and Deserialize of canal-json
+ *
+ * @return format
+ * @see Format
+ */
+ @Override
+ public Format getFormat() {
+ return new CanalJsonFormat();
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java
new file mode 100644
index 000000000..6793dde30
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/DebeziumJsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class DebeziumJsonFormatTest extends FormatBaseTest {
+
+ /**
+ * Test Serialize and Deserialize of debezium-json
+ *
+ * @return format
+ * @see Format
+ */
+ @Override
+ public Format getFormat() {
+ return new DebeziumJsonFormat();
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java
new file mode 100644
index 000000000..e476ce96e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/FormatBaseTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class FormatBaseTest {
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private Format format;
+
+ /**
+ * init format object
+ */
+ @Before
+ public void before() {
+ this.format = Preconditions.checkNotNull(getFormat());
+ }
+
+ public abstract Format getFormat();
+
+ /**
+ * Test Serialize and Deserialize
+ * @throws JsonProcessingException throw JsonProcessingException when parse exception
+ */
+ @Test
+ public void testSerializeAndDeserialize() throws JsonProcessingException {
+ String jsonStr = objectMapper.writeValueAsString(format);
+ Format deserializeFormat = objectMapper.readValue(jsonStr, Format.class);
+ assertEquals(format, deserializeFormat);
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java
new file mode 100644
index 000000000..4b777a2a8
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/format/JsonFormatTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.format;
+
+public class JsonFormatTest extends FormatBaseTest {
+
+ /**
+ * Test Serialize and Deserialize of json
+ *
+ * @return format
+ * @see Format
+ */
+ @Override
+ public Format getFormat() {
+ return new JsonFormat();
+ }
+}