You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/28 09:54:25 UTC

[inlong] branch master updated: [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ff173854f [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)
ff173854f is described below

commit ff173854fcd4f2c11211be13923f809bb799495c
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Mon Nov 28 17:54:19 2022 +0800

    [INLONG-6617][Sort] Add common process for dirty data sink and supports log sink (#6618)
---
 .../org/apache/inlong/sort/base/Constants.java     | 103 ++++++++++++-
 .../apache/inlong/sort/base/dirty/DirtyData.java   | 143 ++++++++++++++++++
 .../inlong/sort/base/dirty/DirtyOptions.java       | 115 +++++++++++++++
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  77 ++++++++++
 .../inlong/sort/base/dirty/sink/DirtySink.java     |  59 ++++++++
 .../sort/base/dirty/sink/DirtySinkFactory.java     |  36 +++++
 .../sort/base/dirty/sink/log/LogDirtySink.java     | 122 +++++++++++++++
 .../base/dirty/sink/log/LogDirtySinkFactory.java   |  64 ++++++++
 .../inlong/sort/base/dirty/utils/FormatUtils.java  | 164 +++++++++++++++++++++
 .../apache/inlong/sort/base/util/LabelUtils.java   |  66 +++++++++
 .../inlong/sort/base/util/PatternReplaceUtils.java |  46 ++++++
 .../org.apache.flink.table.factories.Factory       |  16 ++
 .../inlong/sort/base/dirty/FormatUtilsTest.java    | 100 +++++++++++++
 .../inlong/sort/base/util/LabelUtilsTest.java      |  39 +++++
 .../sort/base/util/PatternReplaceUtilsTest.java    |  40 +++++
 15 files changed, 1189 insertions(+), 1 deletion(-)

diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index d99fc8012..0b91fcfce 100644
--- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -74,9 +74,13 @@ public final class Constants {
      */
     public static final String NODE_ID = "nodeId";
     /**
-     * It is used for inlong.metric
+     * It is used for 'inlong.metric.labels' or 'sink.dirty.labels'
      */
     public static final String DELIMITER = "&";
+    /**
+     * The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels'
+     */
+    public static final String KEY_VALUE_DELIMITER = "=";
 
     // sort received successfully
     public static final Integer AUDIT_SORT_INPUT = 7;
@@ -177,4 +181,101 @@ public final class Constants {
                     .defaultValue(false)
                     .withDescription("Because spark do not support iceberg data type: `timestamp without time zone` and"
                             + "`time`, so type conversions must be mapped to types supported by spark.");
+
+    // ========================================= dirty configuration =========================================
+    public static final String DIRTY_PREFIX = "dirty.";
+
+    public static final ConfigOption<Boolean> DIRTY_IGNORE =
+            ConfigOptions.key("dirty.ignore")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether ignore the dirty data, default value is 'false'");
+    public static final ConfigOption<String> DIRTY_IDENTIFIER =
+            ConfigOptions.key("dirty.identifier")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The identifier of dirty data, "
+                                    + "it will be used for filename generation of file dirty sink, "
+                                    + "topic generation of mq dirty sink, tablename generation of database, etc."
+                                    + "and it supports variable replace like '${variable}'."
+                                    + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + "and the support of other variables is determined by the connector.");
+    public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
+            ConfigOptions.key("dirty.side-output.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether supports dirty data side-output, default value is 'false'");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_CONNECTOR =
+            ConfigOptions.key("dirty.side-output.connector")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The connector of dirty side-output");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FORMAT =
+            ConfigOptions.key("dirty.side-output.format")
+                    .stringType()
+                    .defaultValue("csv")
+                    .withDescription(
+                            "The format of dirty side-output, only support [csv|json] for now and default value is 'csv'");
+    public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_IGNORE_ERRORS =
+            ConfigOptions.key("dirty.side-output.ignore-errors")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether ignore the dirty side-output erros, default value is 'true'.");
+    public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_LOG_ENABLE =
+            ConfigOptions.key("dirty.side-output.log.enable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("Whether enable log print, default value is 'true'.");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LABELS =
+            ConfigOptions.key("dirty.side-output.labels")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The labels of dirty side-output, format is 'key1=value1&key2=value2', "
+                                    + "it supports variable replace like '${variable}',"
+                                    + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + " and the support of other variables is determined by the connector.");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
+            ConfigOptions.key("dirty.side-output.log-tag")
+                    .stringType()
+                    .defaultValue("DirtyData")
+                    .withDescription(
+                            "The log tag of dirty side-output, it supports variable replace like '${variable}'."
+                                    + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + " and the support of other variables is determined by the connector.");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER =
+            ConfigOptions.key("dirty.side-output.field-delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription("The field-delimiter of dirty side-output");
+    public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LINE_DELIMITER =
+            ConfigOptions.key("dirty.side-output.line-delimiter")
+                    .stringType()
+                    .defaultValue("\n")
+                    .withDescription("The line-delimiter of dirty sink");
+    public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_BATCH_SIZE = ConfigOptions
+            .key("dirty.side-output.batch.size")
+            .intType()
+            .defaultValue(100)
+            .withDescription(
+                    "The flush max size, over this number of records, will flush data. The default value is 100.");
+    public static final ConfigOption<Integer> DIRTY_SIDE_OUTPUT_RETRIES = ConfigOptions
+            .key("dirty.side-output.retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("The retry times if writing records failed.");
+    public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_INTERVAL = ConfigOptions
+            .key("dirty.side-output.batch.interval")
+            .longType()
+            .defaultValue(60000L)
+            .withDescription(
+                    "The flush interval mills, over this time, "
+                            + "asynchronous threads will flush data. The default value is 60s.");
+    public static final ConfigOption<Long> DIRTY_SIDE_OUTPUT_BATCH_BYTES = ConfigOptions
+            .key("dirty.side-output.batch.bytes")
+            .longType()
+            .defaultValue(10240L)
+            .withDescription(
+                    "The flush max bytes, over this number in batch, will flush data. The default value is 10KB.");
 }
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
new file mode 100644
index 000000000..a8b84f2b4
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.inlong.sort.base.util.PatternReplaceUtils;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Dirty data base class, it is a wrapper of dirty data
+ *
+ * @param <T>
+ */
+public class DirtyData<T> {
+
+    private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+
+    private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+
+    private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    /**
+     * The identifier of dirty data, it will be used for filename generation of file dirty sink,
+     * topic generation of mq dirty sink, tablename generation of database, etc,
+     * and it supports variable replace like '${variable}'.
+     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,
+     * and the support of other variables is determined by the connector.
+     */
+    private final String identifier;
+    /**
+     * The labels of the dirty data, it will be written to store system of dirty
+     */
+    private final String labels;
+    /**
+     * The log tag of dirty data, it is only used to format log as follows:
+     * [${logTag}] ${labels} ${data}
+     */
+    private final String logTag;
+    /**
+     * Dirty type
+     */
+    private final DirtyType dirtyType;
+    /**
+     * The real dirty data
+     */
+    private final T data;
+
+    public DirtyData(T data, String identifier, String labels, String logTag, DirtyType dirtyType) {
+        this.data = data;
+        this.dirtyType = dirtyType;
+        Map<String, String> paramMap = genParamMap();
+        this.labels = PatternReplaceUtils.replace(labels, paramMap);
+        this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
+        this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+    }
+
+    public static <T> Builder<T> builder() {
+        return new Builder<>();
+    }
+
+    private Map<String, String> genParamMap() {
+        Map<String, String> paramMap = new HashMap<>();
+        paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+        return paramMap;
+    }
+
+    public String getLabels() {
+        return labels;
+    }
+
+    public String getLogTag() {
+        return logTag;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public DirtyType getDirtyType() {
+        return dirtyType;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public static class Builder<T> {
+
+        private String identifier;
+        private String labels;
+        private String logTag;
+        private DirtyType dirtyType = DirtyType.UNDEFINED;
+        private T data;
+
+        public Builder<T> setDirtyType(DirtyType dirtyType) {
+            this.dirtyType = dirtyType;
+            return this;
+        }
+
+        public Builder<T> setLabels(String labels) {
+            this.labels = labels;
+            return this;
+        }
+
+        public Builder<T> setData(T data) {
+            this.data = data;
+            return this;
+        }
+
+        public Builder<T> setLogTag(String logTag) {
+            this.logTag = logTag;
+            return this;
+        }
+
+        public Builder<T> setIdentifier(String identifier) {
+            this.identifier = identifier;
+            return this;
+        }
+
+        public DirtyData<T> build() {
+            return new DirtyData<>(data, identifier, labels, logTag, dirtyType);
+        }
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
new file mode 100644
index 000000000..652328e47
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyOptions.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+
+import java.io.Serializable;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_IGNORE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_ENABLE;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LABELS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG;
+
+/**
+ * Dirty common options
+ */
+public class DirtyOptions implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final boolean ignoreDirty;
+    private final boolean enableDirtySideOutput;
+    private final boolean ignoreSideOutputErrors;
+    private final String dirtyConnector;
+    private final String labels;
+    private final String logTag;
+    private final String identifier;
+
+    private DirtyOptions(boolean ignoreDirty, boolean enableDirtySideOutput, boolean ignoreSideOutputErrors,
+            String dirtyConnector, String labels, String logTag, String identifier) {
+        this.ignoreDirty = ignoreDirty;
+        this.enableDirtySideOutput = enableDirtySideOutput;
+        this.ignoreSideOutputErrors = ignoreSideOutputErrors;
+        this.dirtyConnector = dirtyConnector;
+        this.labels = labels;
+        this.logTag = logTag;
+        this.identifier = identifier;
+    }
+
+    /**
+     * Get dirty options from {@link ReadableConfig}
+     *
+     * @param config The config
+     * @return Dirty options
+     */
+    public static DirtyOptions fromConfig(ReadableConfig config) {
+        boolean ignoreDirty = config.get(DIRTY_IGNORE);
+        boolean enableDirtySink = config.get(DIRTY_SIDE_OUTPUT_ENABLE);
+        boolean ignoreSinkError = config.get(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+        String dirtyConnector = config.getOptional(DIRTY_SIDE_OUTPUT_CONNECTOR).orElse(null);
+        String labels = config.getOptional(DIRTY_SIDE_OUTPUT_LABELS).orElse(null);
+        String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG);
+        String identifier = config.get(DIRTY_IDENTIFIER);
+        return new DirtyOptions(ignoreDirty, enableDirtySink, ignoreSinkError,
+                dirtyConnector, labels, logTag, identifier);
+    }
+
+    public void validate() {
+        if (!ignoreDirty || !enableDirtySideOutput) {
+            return;
+        }
+        if (dirtyConnector == null || dirtyConnector.trim().length() == 0) {
+            throw new ValidationException(
+                    "The option 'dirty.side-output.connector' is not allowed to be empty "
+                            + "when the option 'dirty.ignore' is 'true' "
+                            + "and the option 'dirty.side-output.enable' is 'true'");
+        }
+    }
+
+    public boolean ignoreDirty() {
+        return ignoreDirty;
+    }
+
+    public String getDirtyConnector() {
+        return dirtyConnector;
+    }
+
+    public String getLabels() {
+        return labels;
+    }
+
+    public String getLogTag() {
+        return logTag;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public boolean ignoreSideOutputErrors() {
+        return ignoreSideOutputErrors;
+    }
+
+    public boolean enableDirtySideOutput() {
+        return enableDirtySideOutput;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
new file mode 100644
index 000000000..32f35874d
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+/**
+ * Dirty type enum
+ */
+public enum DirtyType {
+
+    /**
+     * Field mapping error, it refers to the field mapping error between source
+     * and sink or between the flink system and the external system.
+     * For example, the number of fields contained in the flink system
+     * is greater than the number of fields written to the external system, etc.
+     */
+    FIELD_MAPPING_ERROR("FieldMappingError"),
+    /**
+     * Data type mapping error, it refers to the data type mismatch between the source and the sink
+     * or between the flink system and the external system.
+     */
+    DATA_TYPE_MAPPING_ERROR("DataTypeMappingError"),
+    /**
+     * Deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka, etc.
+     */
+    DESERIALIZE_ERROR("DeserializeError"),
+    /**
+     * Serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+     */
+    SERIALIZE_ERROR("SerializeError"),
+    /**
+     * Key deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka,
+     * etc.
+     */
+    KEY_DESERIALIZE_ERROR("KeyDeserializeError"),
+    /**
+     * Key serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+     */
+    KEY_SERIALIZE_ERROR("KeySerializeError"),
+    /**
+     * Value deserialize error,deserialize errors often occur at the source side that requires decoding, such as kafka,
+     * etc.
+     */
+    VALUE_DESERIALIZE_ERROR("ValueDeserializeError"),
+    /**
+     * Value serialize error, it occur at the sink side that requires encoding, such as kafka, etc.
+     */
+    VALUE_SERIALIZE_ERROR("ValueSerializeError"),
+    /**
+     * Undefined dirty type
+     */
+    UNDEFINED("Undefined");
+
+    private final String format;
+
+    DirtyType(String format) {
+        this.format = format;
+    }
+
+    public String format() {
+        return format;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java
new file mode 100644
index 000000000..cd60123a1
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySink.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+
+import java.io.Serializable;
+
+/**
+ * The dirty sink base inteface
+ *
+ * @param <T>
+ */
+public interface DirtySink<T> extends Serializable {
+
+    /**
+     * Open for dirty sink
+     *
+     * @param configuration The configuration that is used for dirty sink
+     * @throws Exception The exception may be thrown when executing
+     */
+    default void open(Configuration configuration) throws Exception {
+
+    }
+
+    /**
+     * Invoke that is used to sink dirty data
+     *
+     * @param dirtyData The dirty data that will be written
+     * @throws Exception The exception may be thrown when executing
+     */
+    void invoke(DirtyData<T> dirtyData) throws Exception;
+
+    /**
+     * Close for dirty sink
+     *
+     * @throws Exception The exception may be thrown when executing
+     */
+    default void close() throws Exception {
+
+    }
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
new file mode 100644
index 000000000..b6725ddd8
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtySinkFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink;
+
+import org.apache.flink.table.factories.DynamicTableFactory;
+
+/**
+ * Dirty sink factory class, it is used to create dirty sink
+ */
+public interface DirtySinkFactory extends DynamicTableFactory {
+
+    /**
+     * Create dirty sink
+     *
+     * @param context The context of the dirty sink
+     * @param <T> The data mode that is handled by the dirty sink
+     * @return A dirty sink
+     */
+    <T> DirtySink<T> createDirtySink(DynamicTableFactory.Context context);
+
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
new file mode 100644
index 000000000..bf5a4f135
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.log;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Log dirty sink that is used to print log
+ *
+ * @param <T>
+ */
+public class LogDirtySink<T> implements DirtySink<T> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(LogDirtySink.class);
+
+    private final RowData.FieldGetter[] fieldGetters;
+    private final String format;
+    private final String fieldDelimiter;
+    private final DataType physicalRowDataType;
+    private RowDataToJsonConverter converter;
+
+    public LogDirtySink(String format, String fieldDelimiter, DataType physicalRowDataType) {
+        this.format = format;
+        this.fieldDelimiter = fieldDelimiter;
+        this.physicalRowDataType = physicalRowDataType;
+        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
+                .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
+        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+        for (int i = 0; i < logicalTypes.length; i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
+        }
+    }
+
+    @Override
+    public void open(Configuration configuration) throws Exception {
+        converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null)
+                .createConverter(physicalRowDataType.getLogicalType());
+    }
+
+    @Override
+    public void invoke(DirtyData<T> dirtyData) throws Exception {
+        String value;
+        Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
+        T data = dirtyData.getData();
+        if (data instanceof RowData) {
+            value = format((RowData) data, labelMap);
+        } else if (data instanceof JsonNode) {
+            value = format((JsonNode) data, labelMap);
+        } else {
+            // Only support csv format when the row is not a 'RowData' and 'JsonNode'
+            value = FormatUtils.csvFormat(data, labelMap, fieldDelimiter);
+        }
+        LOG.info("[{}] {}", dirtyData.getLogTag(), value);
+    }
+
+    private String format(RowData data, Map<String, String> labels) throws JsonProcessingException {
+        String value;
+        switch (format) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, fieldGetters, labels, fieldDelimiter);
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, converter, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", format));
+        }
+        return value;
+    }
+
+    private String format(JsonNode data, Map<String, String> labels) throws JsonProcessingException {
+        String value;
+        switch (format) {
+            case "csv":
+                value = FormatUtils.csvFormat(data, labels, fieldDelimiter);
+                break;
+            case "json":
+                value = FormatUtils.jsonFormat(data, labels);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported format for: %s", format));
+        }
+        return value;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
new file mode 100644
index 000000000..93a12f584
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.log;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FIELD_DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+
+/**
+ * Log dirty sink factory
+ */
+public class LogDirtySinkFactory implements DirtySinkFactory {
+
+    private static final String IDENTIFIER = "log";
+
+    @Override
+    public <T> DirtySink<T> createDirtySink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        FactoryUtil.validateFactoryOptions(this, helper.getOptions());
+        String format = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FORMAT);
+        String fieldDelimiter = helper.getOptions().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        return new LogDirtySink<>(format, fieldDelimiter,
+                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return new HashSet<>();
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+        options.add(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        return options;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
new file mode 100644
index 000000000..3220837dd
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.table.data.RowData;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.StringJoiner;
+
+/**
+ * Format utils
+ */
+public final class FormatUtils {
+
+    /**
+     * The value for nullable field that is used for 'csv' format
+     */
+    private static final String NULL_VALUE = "null";
+    /**
+     * The default value of field delimiter that is used for 'csv' format
+     */
+    private static final String DEFAULT_FIELD_DELIMITER = ",";
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+    private static final ObjectNode reuse = MAPPER.createObjectNode();
+
+    private FormatUtils() {
+    }
+
+    /**
+     * Csv format for 'RowData'
+     *
+     * @param data The data wrapper with 'RowData'
+     * @param fieldGetters The field getters of 'RowData'
+     * @param labels The labels of dirty sink
+     * @param fieldDelimiter The field delimiter
+     * @return The value after format
+     */
+    public static String csvFormat(RowData data, RowData.FieldGetter[] fieldGetters,
+            Map<String, String> labels, String fieldDelimiter) {
+        StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+        for (int i = 0; i < data.getArity(); i++) {
+            Object value = fieldGetters[i].getFieldOrNull(data);
+            result.add(value != null ? value.toString() : NULL_VALUE);
+        }
+        return result.toString();
+    }
+
+    /**
+     * Csv format for 'JsonNode'
+     *
+     * @param data The data wrapper with 'JsonNode'
+     * @param labels The labels of dirty sink
+     * @param fieldDelimiter The field delimiter
+     * @return The value after format
+     */
+    public static String csvFormat(JsonNode data, Map<String, String> labels, String fieldDelimiter) {
+        StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+        Iterator<Entry<String, JsonNode>> iterator = data.fields();
+        while (iterator.hasNext()) {
+            Entry<String, JsonNode> kv = iterator.next();
+            result.add(kv.getValue() != null ? kv.getValue().asText() : NULL_VALUE);
+        }
+        return result.toString();
+    }
+
+    /**
+     * Csv format for 'Object'
+     *
+     * @param data Any object
+     * @param labels The labels of dirty sink
+     * @param fieldDelimiter The field delimiter
+     * @return The value after format
+     */
+    public static String csvFormat(Object data, Map<String, String> labels, String fieldDelimiter) {
+        StringJoiner result = csvFormatForLabels(labels, fieldDelimiter);
+        result.add(data == null ? NULL_VALUE : data.toString());
+        return result.toString();
+    }
+
+    /**
+     * Csv format for labels
+     *
+     * @param labels The labels of dirty sink
+     * @param result The result that store the value after format
+     * @return The value after format
+     */
+    public static StringJoiner csvFormatForLabels(Map<String, String> labels, StringJoiner result) {
+        if (labels == null || labels.isEmpty()) {
+            return result;
+        }
+        for (Entry<String, String> kv : labels.entrySet()) {
+            result.add(kv.getValue() != null ? kv.getValue() : NULL_VALUE);
+        }
+        return result;
+    }
+
+    /**
+     * Csv format for labels
+     *
+     * @param labels The labels of dirty sink
+     * @param fieldDelimiter The field delimiter
+     * @return The value after format
+     */
+    public static StringJoiner csvFormatForLabels(Map<String, String> labels, String fieldDelimiter) {
+        return csvFormatForLabels(labels,
+                new StringJoiner(fieldDelimiter == null ? DEFAULT_FIELD_DELIMITER : fieldDelimiter));
+    }
+
+    /**
+     * Json format for 'RowData'
+     *
+     * @param data The data wrapper with 'RowData'
+     * @param converter The converter of 'RowData'
+     * @param labels The labels of dirty sink
+     * @return The value after format
+     * @throws JsonProcessingException The exception may be thrown when executing
+     */
+    public static String jsonFormat(RowData data, RowDataToJsonConverter converter,
+            Map<String, String> labels) throws JsonProcessingException {
+        return jsonFormat(converter.convert(MAPPER, reuse, data), labels);
+    }
+
+    /**
+     * Json format for 'JsonNode'
+     *
+     * @param data The data wrapper with 'JsonNode'
+     * @param labels The labels of dirty sink
+     * @return The value after format
+     * @throws JsonProcessingException The exception may be thrown when executing
+     */
+    public static String jsonFormat(JsonNode data, Map<String, String> labels)
+            throws JsonProcessingException {
+        Map<String, Object> result = new LinkedHashMap<>(labels);
+        Iterator<Entry<String, JsonNode>> iterator = data.fields();
+        while (iterator.hasNext()) {
+            Entry<String, JsonNode> kv = iterator.next();
+            result.put(kv.getKey(), kv.getValue());
+        }
+        return MAPPER.writeValueAsString(result);
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java
new file mode 100644
index 000000000..24e0c5ad7
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/LabelUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.KEY_VALUE_DELIMITER;
+
+/**
+ * The lable utils class, it is used for parse the lables to a label map
+ */
+public final class LabelUtils {
+
+    private LabelUtils() {
+    }
+
+    /**
+     * Parse the labels to label map
+     *
+     * @param labels The labels format by 'key1=value1&key2=value2...'
+     * @return The label map of labels
+     */
+    public static Map<String, String> parseLabels(String labels) {
+        return parseLabels(labels, new LinkedHashMap<>());
+    }
+
+    /**
+     * Parse the labels to label map
+     *
+     * @param labels The labels format by 'key1=value1&key2=value2...'
+     * @return The label map of labels
+     */
+    public static Map<String, String> parseLabels(String labels, Map<String, String> labelMap) {
+        if (labelMap == null) {
+            labelMap = new LinkedHashMap<>();
+        }
+        if (labels == null || labels.length() == 0) {
+            return labelMap;
+        }
+        String[] labelArray = labels.split(DELIMITER);
+        for (String label : labelArray) {
+            int index = label.indexOf(KEY_VALUE_DELIMITER);
+            if (index < 1 || index == label.length() - 1) {
+                throw new IllegalArgumentException("The format of labels must be like 'key1=value1&key2=value2...'");
+            }
+            labelMap.put(label.substring(0, index), label.substring(index + 1));
+        }
+        return labelMap;
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java
new file mode 100644
index 000000000..7a6938dae
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/PatternReplaceUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * The pattern replace utils
+ */
+public final class PatternReplaceUtils {
+
+    private static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}",
+            Pattern.CASE_INSENSITIVE);
+
+    public static String replace(String pattern, Map<String, String> params) {
+        if (pattern == null) {
+            return pattern;
+        }
+        Matcher matcher = REGEX_PATTERN.matcher(pattern);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String keyText = matcher.group(1);
+            String replacement = params.getOrDefault(keyText, keyText);
+            matcher.appendReplacement(sb, replacement);
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 000000000..412dedf67
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
new file mode 100644
index 000000000..386766ddd
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Test for {@link org.apache.inlong.sort.base.dirty.utils.FormatUtils}
+ */
+public class FormatUtilsTest {
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private JsonNode jsonNode;
+    private RowData rowData;
+    private FieldGetter[] fieldGetters;
+    private Map<String, String> labelMap;
+    private ResolvedSchema schema;
+
+    @Before
+    public void init() throws JsonProcessingException {
+        schema = ResolvedSchema.of(
+                Column.physical("id", DataTypes.BIGINT()),
+                Column.physical("name", DataTypes.STRING()),
+                Column.physical("age", DataTypes.INT()));
+        List<LogicalType> logicalTypes = schema.toPhysicalRowDataType()
+                .getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList());
+        fieldGetters = new RowData.FieldGetter[logicalTypes.size()];
+        for (int i = 0; i < logicalTypes.size(); i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
+        }
+        rowData = GenericRowData.of(1L, StringData.fromString("leo"), 18);
+        String jsonStr = "{\"id\":1,\"name\":\"leo\",\"age\":18}";
+        jsonNode = mapper.readValue(jsonStr, JsonNode.class);
+        labelMap = new HashMap<>();
+        labelMap.put("database", "inlong");
+        labelMap.put("table", "student");
+    }
+
+    @Test
+    public void testCsvFormat() {
+        String expected = "inlong,student,1,leo,18";
+        Assert.assertEquals(expected, FormatUtils.csvFormat(jsonNode, labelMap, ","));
+        Assert.assertEquals(expected, FormatUtils.csvFormat(rowData, fieldGetters, labelMap, ","));
+        Assert.assertEquals(expected, FormatUtils.csvFormat("1,leo,18", labelMap, ","));
+    }
+
+    @Test
+    public void testJsonFormat() throws JsonProcessingException {
+        RowDataToJsonConverter converter = new RowDataToJsonConverters(TimestampFormat.SQL,
+                MapNullKeyMode.DROP, null)
+                        .createConverter(schema.toPhysicalRowDataType().getLogicalType());
+        String expectedStr = "{\"database\":\"inlong\",\"table\":\"student\",\"id\":1,\"name\":\"leo\",\"age\":18}";
+        JsonNode expected = mapper.readValue(expectedStr, JsonNode.class);
+        String actual = FormatUtils.jsonFormat(jsonNode, labelMap);
+        Assert.assertEquals(expected, mapper.readValue(actual, JsonNode.class));
+        actual = FormatUtils.jsonFormat(rowData, converter, labelMap);
+        Assert.assertEquals(expected, mapper.readValue(actual, JsonNode.class));
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java
new file mode 100644
index 000000000..0f7046f5c
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/LabelUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Test for {@link LabelUtils}
+ */
+public class LabelUtilsTest {
+
+    @Test
+    public void testParseLabels() {
+        String labels = "database=inlong&table=student";
+        Map<String, String> expected = new LinkedHashMap<>();
+        expected.put("database", "inlong");
+        expected.put("table", "student");
+        Assert.assertEquals(expected, LabelUtils.parseLabels(labels));
+    }
+}
diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java
new file mode 100644
index 000000000..307c215c0
--- /dev/null
+++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/PatternReplaceUtilsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test for {@link PatternReplaceUtils}
+ */
+public class PatternReplaceUtilsTest {
+
+    @Test
+    public void testParseLabels() {
+        String labels = "database=${database}&table=${table}";
+        Map<String, String> params = new HashMap<>();
+        params.put("database", "inlong");
+        params.put("table", "student");
+        String expected = "database=inlong&table=student";
+        Assert.assertEquals(expected, PatternReplaceUtils.replace(labels, params));
+    }
+}