You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/28 09:54:25 UTC
[inlong] branch master updated: [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ff173854f [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)
ff173854f is described below
commit ff173854fcd4f2c11211be13923f809bb799495c
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Nov 28 17:54:19 2022 +0800
[INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)
---
.../org/apache/inlong/sort/base/Constants.java | 103 ++++++++++++-
.../apache/inlong/sort/base/dirty/DirtyData.java | 143 ++++++++++++++++++
.../inlong/sort/base/dirty/DirtyOptions.java | 115 +++++++++++++++
.../apache/inlong/sort/base/dirty/DirtyType.java | 77 ++++++++++
.../inlong/sort/base/dirty/sink/DirtySink.java | 59 ++++++++
.../sort/base/dirty/sink/DirtySinkFactory.java | 36 +++++
.../sort/base/dirty/sink/log/LogDirtySink.java | 122 +++++++++++++++
.../base/dirty/sink/log/LogDirtySinkFactory.java | 64 ++++++++
.../inlong/sort/base/dirty/utils/FormatUtils.java | 164 +++++++++++++++++++++
.../apache/inlong/sort/base/util/LabelUtils.java | 66 +++++++++
.../inlong/sort/base/util/PatternReplaceUtils.java | 46 ++++++
.../org.apache.flink.table.factories.Factory | 16 ++
.../inlong/sort/base/dirty/FormatUtilsTest.java | 100 +++++++++++++
.../inlong/sort/base/util/LabelUtilsTest.java | 39 +++++
.../sort/base/util/PatternReplaceUtilsTest.java | 40 +++++
15 files changed, 1189 insertions(+), 1 deletion(-)
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index d99fc8012..0b91fcfce 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -74,9 +74,13 @@ public final class Constants {
*/
public static final String NODE_ID = "nodeId";
/**
- * It is used for inlong.metric
+ * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
*/
public static final String DELIMITER = "&";
+ /**
+ * The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels'
+ */
+ public static final String KEY_VALUE_DELIMITER = "=";
// sort received successfully
public static final Integer AUDIT_SORT_INPUT = 7;
@@ -177,4 +181,101 @@ public final class Constants {
.defaultValue(false)
.withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
+ "`time`, so type conversions must be mapped to types supported by spark.");
+
+ // ========================================= dirty configuration =========================================
+ public static final String DIRTY_PREFIX = "dirty.";
+
+ public static final ConfigOption<Boolean> DIRTY_IGNORE =
+ ConfigOptions.key("dirty.ignore")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether ignore the dirty data, default value is 'false'");
+ public static final ConfigOption<String> DIRTY_IDENTIFIER =
+ ConfigOptions.key("dirty.identifier")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The identifier of dirty data, "
+ + "it will be used for filename generation of file dirty sink, "
+ + "topic generation of mq dirty sink, tablename generation of database, etc."
+ + "and it supports variable replace like '${variable}'."
+ + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + "and the support of other variables is determined by the connector.");
+ public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
+ ConfigOptions.key("dirty.side-output.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether supports dirty data side-output, default value is 'false'");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_CONNECTOR =
+ ConfigOptions.key("dirty.side-output.connector")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The connector of dirty side-output");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FORMAT =
+ ConfigOptions.key("dirty.side-output.format")
+ .stringType()
+ .defaultValue("csv")
+ .withDescription(
+ "The format of dirty side-output, only support [csv|json] for now and default value is 'csv'");
+ public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
+ ConfigOptions.key("dirty.side-output.ignore-errors")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether ignore the dirty side-output erros, default value is 'true'.");
+ public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_LOG_ENABLE =
+ ConfigOptions.key("dirty.side-output.log.enable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription("Whether enable log print, default value is 'true'.");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LABELS =
+ ConfigOptions.key("dirty.side-output.labels")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The labels of dirty side-output, format is 'key1=value1&key2=value2', "
+ + "it supports variable replace like '${variable}',"
+ + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + " and the support of other variables is determined by the connector.");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
+ ConfigOptions.key("dirty.side-output.log-tag")
+ .stringType()
+ .defaultValue("DirtyData")
+ .withDescription(
+ "The log tag of dirty side-output, it supports variable replace like '${variable}'."
+ + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+ + " and the support of other variables is determined by the connector.");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER =
+ ConfigOptions.key("dirty.side-output.field-delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("The field-delimiter of dirty side-output");
+ public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
+ ConfigOptions.key("dirty.side-output.line-delimiter")
+ .stringType()
+ .defaultValue("\n")
+ .withDescription("The line-delimiter of dirty sink");
+ public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_BATCH_SIZE = ConfigOptions
+ .key("dirty.side-output.batch.size")
+ .intType()
+ .defaultValue(100)
+ .withDescription(
+ "The flush max size, over this number of records, will flush data. The default value is 100.");
+ public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_RETRIES = ConfigOptions
+ .key("dirty.side-output.retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The retry times if writing records failed.");
+ public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_INTERVAL = ConfigOptions
+ .key("dirty.side-output.batch.interval")
+ .longType()
+ .defaultValue(60000L)
+ .withDescription(
+ "The flush interval mills, over this time, "
+ + "asynchronous threads will flush data. The default value is 60s.");
+ public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_BYTES = ConfigOptions
+ .key("dirty.side-output.batch.bytes")
+ .longType()
+ .defaultValue(10240L)
+ .withDescription(
+ "The flush max bytes, over this number in batch, will flush data. The default value is 10KB.");
}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
new file mode 100644
index 000000000..a8b84f2b4
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.inlong.sort.base.util.PatternReplaceUtils;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Dirty data base class, it is a wrapper of dirty data
+ *
+ * @param <T>
+ */
+public class DirtyData<T> {
+
+ private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+
+ private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+
+ private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ /**
+ * The identifier of dirty data, it will be used for filename generation of file dirty sink,
+ * topic generation of mq dirty sink, tablename generation of database, etc,
+ * and it supports variable replace like '${variable}'.
+ * There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,
+ * and the support of other variables is determined by the connector.
+ */
+ private final String identifier;
+ /**
+ * The labels of the dirty data, it will be written to store system of dirty
+ */
+ private final String labels;
+ /**
+ * The log tag of dirty data, it is only used to format log as follows:
+ * [${logTag}] ${labels} ${data}
+ */
+ private final String logTag;
+ /**
+ * Dirty type
+ */
+ private final DirtyType dirtyType;
+ /**
+ * The real dirty data
+ */
+ private final T data;
+
+ public DirtyData(T data, String identifier, String labels, String logTag, DirtyType dirtyType) {
+ this.data = data;
+ this.dirtyType = dirtyType;
+ Map<String, String> paramMap = genParamMap();
+ this.labels = PatternReplaceUtils.replace(labels, paramMap);
+ this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
+ this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+ }
+
+ public static <T> Builder<T> builder() {
+ return new Builder<>();
+ }
+
+ private Map<String, String> genParamMap() {
+ Map<String, String> paramMap = new HashMap<>();
+ paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now()));
+ paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+ return paramMap;
+ }
+
+ public String getLabels() {
+ return labels;
+ }
+
+ public String getLogTag() {
+ return logTag;
+ }
+
+ public T getData() {
+ return data;
+ }
+
+ public DirtyType getDirtyType() {
+ return dirtyType;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public static class Builder<T> {
+
+ private String identifier;
+ private String labels;
+ private String logTag;
+ private DirtyType dirtyType = DirtyType.UNDEFINED;
+ private T data;
+
+ public Builder<T> setDirtyType(DirtyType dirtyType) {
+ this.dirtyType = dirtyType;
+ return this;
+ }
+
+ public Builder<T> setLabels(String labels) {
+ this.labels = labels;
+ return this;
+ }
+
+ public Builder<T> setData(T data) {
+ this.data = data;
+ return this;
+ }
+
+ public Builder<T> setLogTag(String logTag) {
+ this.logTag = logTag;
+ return this;
+ }
+
+ public Builder<T> setIdentifier(String identifier) {
+ this.identifier = identifier;
+ return this;
+ }
+
+ public DirtyData<T> build() {
+ return new DirtyData<>(data, identifier, labels, logTag, dirtyType);
+ }
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
new file mode 100644
index 000000000..652328e47
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import java.io.Serializable;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IGNORE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+
+/**
+ * Dirty common options
+ */
+public class DirtyOptions implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final boolean ignoreDirty;
+ private final boolean enableDirtySideOutput;
+ private final boolean ignoreSideOutputErrors;
+ private final String dirtyConnector;
+ private final String labels;
+ private final String logTag;
+ private final String identifier;
+
+ private DirtyOptions(boolean ignoreDirty, boolean enableDirtySideOutput, boolean ignoreSideOutputErrors,
+ String dirtyConnector, String labels, String logTag, String identifier) {
+ this.ignoreDirty = ignoreDirty;
+ this.enableDirtySideOutput = enableDirtySideOutput;
+ this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+ this.dirtyConnector = dirtyConnector;
+ this.labels = labels;
+ this.logTag = logTag;
+ this.identifier = identifier;
+ }
+
+ /**
+ * Get dirty options from {@link ReadableConfig}
+ *
+ * @param config The config
+ * @return Dirty options
+ */
+ public static DirtyOptions fromConfig(ReadableConfig config) {
+ boolean ignoreDirty = config.get(DIRTY_IGNORE);
+ boolean enableDirtySink = config.get(DIRTY_SIDE_OUTPUT_ENABLE);
+ boolean ignoreSinkError = config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+ String dirtyConnector = config.getOptional(DIRTY_SIDE_OUTPUT_CONNECTOR).orElse(null);
+ String labels = config.getOptional(DIRTY_SIDE_OUTPUT_LABELS).orElse(null);
+ String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG);
+ String identifier = config.get(DIRTY_IDENTIFIER);
+ return new DirtyOptions(ignoreDirty, enableDirtySink, ignoreSinkError,
+ dirtyConnector, labels, logTag, identifier);
+ }
+
+ public void validate() {
+ if (!ignoreDirty || !enableDirtySideOutput) {
+ return;
+ }
+ if (dirtyConnector == null || dirtyConnector.trim().length() == 0) {
+ throw new ValidationException(
+ "The option 'dirty.side-output.connector' is not allowed to be empty "
+ + "when the option 'dirty.ignore' is 'true' "
+ + "and the option 'dirty.side-output.enable' is 'true'");
+ }
+ }
+
+ public boolean ignoreDirty() {
+ return ignoreDirty;
+ }
+
+ public String getDirtyConnector() {
+ return dirtyConnector;
+ }
+
+ public String getLabels() {
+ return labels;
+ }
+
+ public String getLogTag() {
+ return logTag;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public boolean ignoreSideOutputErrors() {
+ return ignoreSideOutputErrors;
+ }
+
+ public boolean enableDirtySideOutput() {
+ return enableDirtySideOutput;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
new file mode 100644
index 000000000..32f35874d
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+/**
+ * Dirty type enum
+ */
+public enum DirtyType {
+
+ /**
+ * Field mapping error, it refers to the field mapping error between source
+ * and sink or between the flink system and the external system.
+ * For example, the number of fields contained in the flink system
+ * is greater than the number of fields written to the external system, etc.
+ */
+ FIELD_MAPPING_ERROR("FieldMappingError"),
+ /**
+ * Data type mapping error, it refers to the data type mismatch between the source and the sink
+ * or between the flink system and the external system.
+ */
+ DATA_TYPE_MAPPING_ERROR("DataTypeMappingError"),
+ /**
+ * Deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka, etc.
+ */
+ DESERIALIZE_ERROR("DeserializeError"),
+ /**
+ * Serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+ */
+ SERIALIZE_ERROR("SerializeError"),
+ /**
+ * Key deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka,
+ * etc.
+ */
+ KEY_DESERIALIZE_ERROR("KeyDeserializeError"),
+ /**
+ * Key serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+ */
+ KEY_SERIALIZE_ERROR("KeySerializeError"),
+ /**
+ * Value deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka,
+ * etc.
+ */
+ VALUE_DESERIALIZE_ERROR("ValueDeserializeError"),
+ /**
+ * Value serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+ */
+ VALUE_SERIALIZE_ERROR("ValueSerializeError"),
+ /**
+ * Undefined dirty type
+ */
+ UNDEFINED("Undefined");
+
+ private final String format;
+
+ DirtyType(String format) {
+ this.format = format;
+ }
+
+ public String format() {
+ return format;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java
new file mode 100644
index 000000000..cd60123a1
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+
+import java.io.Serializable;
+
+/**
+ * The dirty sink base inteface
+ *
+ * @param <T>
+ */
+public interface DirtySink<T> extends Serializable {
+
+ /**
+ * Open for dirty sink
+ *
+ * @param configuration The configuration that is used for dirty sink
+ * @throws Exception The exception may be thrown when executing
+ */
+ default void open(Configuration configuration) throws Exception {
+
+ }
+
+ /**
+ * Invoke that is used to sink dirty data
+ *
+ * @param dirtyData The dirty data that will be written
+ * @throws Exception The exception may be thrown when executing
+ */
+ void invoke(DirtyData<T> dirtyData) throws Exception;
+
+ /**
+ * Close for dirty sink
+ *
+ * @throws Exception The exception may be thrown when executing
+ */
+ default void close() throws Exception {
+
+ }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
new file mode 100644
index 000000000..b6725ddd8
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Dirty sink factory class, it is used to create dirty sink
+ */
+public interface DirtySinkFactory extends DynamicTableFactory {
+
+ /**
+ * Create dirty sink
+ *
+ * @param context The context of the dirty sink
+ * @param <T> The data mode that is handled by the dirty sink
+ * @return A dirty sink
+ */
+ <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context);
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
new file mode 100644
index 000000000..bf5a4f135
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.log;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Log dirty sink that is used to print log
+ *
+ * @param <T>
+ */
+public class LogDirtySink<T> implements DirtySink<T> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogDirtySink.class);
+
+ private final RowData.FieldGetter[] fieldGetters;
+ private final String format;
+ private final String fieldDelimiter;
+ private final DataType physicalRowDataType;
+ private RowDataToJsonConverter converter;
+
+ public LogDirtySink(String format, String fieldDelimiter, DataType physicalRowDataType) {
+ this.format = format;
+ this.fieldDelimiter = fieldDelimiter;
+ this.physicalRowDataType = physicalRowDataType;
+ final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+ .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+ this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+ for (int i = 0; i < logicalTypes.length; i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+ }
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+ .createConverter(physicalRowDataType.getLogicalType());
+ }
+
+ @Override
+ public void invoke(DirtyData<T> dirtyData) throws Exception {
+ String value;
+ Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
+ T data = dirtyData.getData();
+ if (data instanceof RowData) {
+ value = format((RowData) data, labelMap);
+ } else if (data instanceof JsonNode) {
+ value = format((JsonNode) data, labelMap);
+ } else {
+ // Only support csv format when the row is not a 'RowData' and 'JsonNode'
+ value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
+ }
+ LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+ }
+
+ private String format(RowData data, Map<String, String> labels) throws JsonProcessingException {
+ String value;
+ switch (format) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, fieldGetters, labels, fieldDelimiter);
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, converter, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s", format));
+ }
+ return value;
+ }
+
+ private String format(JsonNode data, Map<String, String> labels) throws JsonProcessingException {
+ String value;
+ switch (format) {
+ case "csv":
+ value = FormatUtils.csvFormat(data, labels, fieldDelimiter);
+ break;
+ case "json":
+ value = FormatUtils.jsonFormat(data, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s", format));
+ }
+ return value;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
new file mode 100644
index 000000000..93a12f584
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.log;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+
+/**
+ * Log dirty sink factory
+ */
+public class LogDirtySinkFactory implements DirtySinkFactory {
+
+ private static final String IDENTIFIER = "log";
+
+ @Override
+ public <T> DirtySink<T> createDirtySink(Context context) {
+ final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+ FactoryUtil.validateFactoryOptions(this, helper.getOptions());
+ String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
+ String fieldDelimiter = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ return new LogDirtySink<>(format, fieldDelimiter,
+ context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+ options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
new file mode 100644
index 000000000..3220837dd
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.StringJoiner;
+
+/**
+ * Format utils
+ */
+public final class FormatUtils {
+
+ /**
+ * The value for nullable field that is used for 'csv' format
+ */
+ private static final String NULL_VALUE = "null";
+ /**
+ * The default value of field delimiter that is used for 'csv' format
+ */
+ private static final String DEFAULT_FIELD_DELIMITER = ",";
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+ private static final ObjectNode reuse = MAPPER.createObjectNode();
+
+ private FormatUtils() {
+ }
+
+ /**
+ * Csv format for 'RowData'
+ *
+ * @param data The data wrapper with 'RowData'
+ * @param fieldGetters The field getters of 'RowData'
+ * @param labels The labels of dirty sink
+ * @param fieldDelimiter The field delimiter
+ * @return The value after format
+ */
+ public static String csvFormat(RowData data, RowData.FieldGetter[] fieldGetters,
+ Map<String, String> labels, String fieldDelimiter) {
+ StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+ for (int i = 0; i < data.getArity(); i++) {
+ Object value = fieldGetters[i].getFieldOrNull(data);
+ result.add(value != null ? value.toString() : NULL_VALUE);
+ }
+ return result.toString();
+ }
+
+ /**
+ * Csv format for 'JsonNode'
+ *
+ * @param data The data wrapper with 'JsonNode'
+ * @param labels The labels of dirty sink
+ * @param fieldDelimiter The field delimiter
+ * @return The value after format
+ */
+ public static String csvFormat(JsonNode data, Map<String, String> labels, String fieldDelimiter) {
+ StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+ Iterator<Entry<String, JsonNode>> iterator = data.fields();
+ while (iterator.hasNext()) {
+ Entry<String, JsonNode> kv = iterator.next();
+ result.add(kv.getValue() != null ? kv.getValue().asText() : NULL_VALUE);
+ }
+ return result.toString();
+ }
+
+ /**
+ * Csv format for 'Object'
+ *
+ * @param data Any object
+ * @param labels The labels of dirty sink
+ * @param fieldDelimiter The field delimiter
+ * @return The value after format
+ */
+ public static String csvFormat(Object data, Map<String, String> labels, String fieldDelimiter) {
+ StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+ result.add(data == null ? NULL_VALUE : data.toString());
+ return result.toString();
+ }
+
+ /**
+ * Csv format for labels
+ *
+ * @param labels The labels of dirty sink
+ * @param result The result that store the value after format
+ * @return The value after format
+ */
+ public static StringJoiner csvFormatForLabels(Map<String, String> labels, StringJoiner result) {
+ if (labels == null || labels.isEmpty()) {
+ return result;
+ }
+ for (Entry<String, String> kv : labels.entrySet()) {
+ result.add(kv.getValue() != null ? kv.getValue() : NULL_VALUE);
+ }
+ return result;
+ }
+
+ /**
+ * Csv format for labels
+ *
+ * @param labels The labels of dirty sink
+ * @param fieldDelimiter The field delimiter
+ * @return The value after format
+ */
+ public static StringJoiner csvFormatForLabels(Map<String, String> labels, String fieldDelimiter) {
+ return csvFormatForLabels(labels,
+ new StringJoiner(fieldDelimiter == null ? DEFAULT_FIELD_DELIMITER : fieldDelimiter));
+ }
+
+ /**
+ * Json format for 'RowData'
+ *
+ * @param data The data wrapper with 'RowData'
+ * @param converter The converter of 'RowData'
+ * @param labels The labels of dirty sink
+ * @return The value after format
+ * @throws JsonProcessingException The exception may be thrown when executing
+ */
+ public static String jsonFormat(RowData data, RowDataToJsonConverter converter,
+ Map<String, String> labels) throws JsonProcessingException {
+ return jsonFormat(converter.convert(MAPPER, reuse, data), labels);
+ }
+
+ /**
+ * Json format for 'JsonNode'
+ *
+ * @param data The data wrapper with 'JsonNode'
+ * @param labels The labels of dirty sink
+ * @return The value after format
+ * @throws JsonProcessingException The exception may be thrown when executing
+ */
+ public static String jsonFormat(JsonNode data, Map<String, String> labels)
+ throws JsonProcessingException {
+ Map<String, Object> result = new LinkedHashMap<>(labels);
+ Iterator<Entry<String, JsonNode>> iterator = data.fields();
+ while (iterator.hasNext()) {
+ Entry<String, JsonNode> kv = iterator.next();
+ result.put(kv.getKey(), kv.getValue());
+ }
+ return MAPPER.writeValueAsString(result);
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java
new file mode 100644
index 000000000..24e0c5ad7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.KEY_VALUE_DELIMITER;
+
+/**
+ * The lable utils class, it is used for parse the lables to a label map
+ */
+public final class LabelUtils {
+
+ private LabelUtils() {
+ }
+
+ /**
+ * Parse the labels to label map
+ *
+ * @param labels The labels format by 'key1=value1&key2=value2...'
+ * @return The label map of labels
+ */
+ public static Map<String, String> parseLabels(String labels) {
+ return parseLabels(labels, new LinkedHashMap<>());
+ }
+
+ /**
+ * Parse the labels to label map
+ *
+ * @param labels The labels format by 'key1=value1&key2=value2...'
+ * @return The label map of labels
+ */
+ public static Map<String, String> parseLabels(String labels, Map<String, String> labelMap) {
+ if (labelMap == null) {
+ labelMap = new LinkedHashMap<>();
+ }
+ if (labels == null || labels.length() == 0) {
+ return labelMap;
+ }
+ String[] labelArray = labels.split(DELIMITER);
+ for (String label : labelArray) {
+ int index = label.indexOf(KEY_VALUE_DELIMITER);
+ if (index < 1 || index == label.length() - 1) {
+ throw new IllegalArgumentException("The format of labels must be like 'key1=value1&key2=value2...'");
+ }
+ labelMap.put(label.substring(0, index), label.substring(index + 1));
+ }
+ return labelMap;
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java
new file mode 100644
index 000000000..7a6938dae
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The pattern replace utils
+ */
+public final class PatternReplaceUtils {
+
+ private static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
+ Pattern.CASE_INSENSITIVE);
+
+ public static String replace(String pattern, Map<String, String> params) {
+ if (pattern == null) {
+ return pattern;
+ }
+ Matcher matcher = REGEX_PATTERN.matcher(pattern);
+ StringBuffer sb = new StringBuffer();
+ while (matcher.find()) {
+ String keyText = matcher.group(1);
+ String replacement = params.getOrDefault(keyText, keyText);
+ matcher.appendReplacement(sb, replacement);
+ }
+ matcher.appendTail(sb);
+ return sb.toString();
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..412dedf67
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
new file mode 100644
index 000000000..386766ddd
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Test for {@link org.apache.inlong.sort.base.dirty.utils.FormatUtils}
+ */
+public class FormatUtilsTest {
+
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ private JsonNode jsonNode;
+ private RowData rowData;
+ private FieldGetter[] fieldGetters;
+ private Map<String, String> labelMap;
+ private ResolvedSchema schema;
+
+ @Before
+ public void init() throws JsonProcessingException {
+ schema = ResolvedSchema.of(
+ Column.physical("id", DataTypes.BIGINT()),
+ Column.physical("name", DataTypes.STRING()),
+ Column.physical("age", DataTypes.INT()));
+ List<LogicalType> logicalTypes = schema.toPhysicalRowDataType()
+ .getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList());
+ fieldGetters = new RowData.FieldGetter[logicalTypes.size()];
+ for (int i = 0; i < logicalTypes.size(); i++) {
+ fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
+ }
+ rowData = GenericRowData.of(1L, StringData.fromString("leo"), 18);
+ String jsonStr = "{\"id\":1,\"name\":\"leo\",\"age\":18}";
+ jsonNode = mapper.readValue(jsonStr, JsonNode.class);
+ labelMap = new HashMap<>();
+ labelMap.put("database", "inlong");
+ labelMap.put("table", "student");
+ }
+
+ @Test
+ public void testCsvFormat() {
+ String expected = "inlong,student,1,leo,18";
+ Assert.assertEquals(expected, FormatUtils.csvFormat(jsonNode, labelMap, ","));
+ Assert.assertEquals(expected, FormatUtils.csvFormat(rowData, fieldGetters, labelMap, ","));
+ Assert.assertEquals(expected, FormatUtils.csvFormat("1,leo,18", labelMap, ","));
+ }
+
+ @Test
+ public void testJsonFormat() throws JsonProcessingException {
+ RowDataToJsonConverter converter = new RowDataToJsonConverters(TimestampFormat.SQL,
+ MapNullKeyMode.DROP, null)
+ .createConverter(schema.toPhysicalRowDataType().getLogicalType());
+ String expectedStr = "{\"database\":\"inlong\",\"table\":\"student\",\"id\":1,\"name\":\"leo\",\"age\":18}";
+ JsonNode expected = mapper.readValue(expectedStr, JsonNode.class);
+ String actual = FormatUtils.jsonFormat(jsonNode, labelMap);
+ Assert.assertEquals(expected, mapper.readValue(actual, JsonNode.class));
+ actual = FormatUtils.jsonFormat(rowData, converter, labelMap);
+ Assert.assertEquals(expected, mapper.readValue(actual, JsonNode.class));
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java
new file mode 100644
index 000000000..0f7046f5c
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Test for {@link LabelUtils}
+ */
+public class LabelUtilsTest {
+
+ @Test
+ public void testParseLabels() {
+ String labels = "database=inlong&table=student";
+ Map<String, String> expected = new LinkedHashMap<>();
+ expected.put("database", "inlong");
+ expected.put("table", "student");
+ Assert.assertEquals(expected, LabelUtils.parseLabels(labels));
+ }
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java
new file mode 100644
index 000000000..307c215c0
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test for {@link PatternReplaceUtils}
+ */
+public class PatternReplaceUtilsTest {
+
+ @Test
+ public void testParseLabels() {
+ String labels = "database=${database}&table=${table}";
+ Map<String, String> params = new HashMap<>();
+ params.put("database", "inlong");
+ params.put("table", "student");
+ String expected = "database=inlong&table=student";
+ Assert.assertEquals(expected, PatternReplaceUtils.replace(labels, params));
+ }
+}