You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/24 03:37:10 UTC

[inlong] branch master updated: [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f085d61 [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)
09f085d61 is described below

commit 09f085d61fe6525b89240a2f9c45b3dfe139e68f
Author: Yizhou Yang <32...@users.noreply.github.com>
AuthorDate: Thu Nov 24 11:37:05 2022 +0800

    [INLONG-6495][Sort] Support metrics and restore metrics for single-table of Doris (#6541)
    
    * Support metrics and restore metrics for single-table of Doris
    
    * Change some codes format and fix the conflict
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../table/DorisDynamicSchemaOutputFormat.java      | 229 ++++++++++++++++++---
 .../sort/doris/table/DorisDynamicTableSink.java    |  54 +----
 .../inlong/sort/doris/util/DorisParseUtils.java    |  70 +++++++
 3 files changed, 284 insertions(+), 69 deletions(-)

diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index b7e0188e1..3f2f9af3f 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -24,6 +24,7 @@ import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.rest.models.Schema;
 import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.common.state.ListState;
@@ -36,6 +37,8 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
@@ -45,22 +48,28 @@ import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.doris.model.RespContent;
+import org.apache.inlong.sort.doris.util.DorisParseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.StringJoiner;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -85,6 +94,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
      * Mark the record for not delete
      */
     private static final String DORIS_DELETE_FALSE = "0";
+    private static final String FORMAT_JSON_VALUE = "json";
+    private static final String FORMAT_KEY = "format";
+    private static final String FIELD_DELIMITER_KEY = "column_separator";
+    private static final String FIELD_DELIMITER_DEFAULT = "\t";
+    private static final String LINE_DELIMITER_KEY = "line_delimiter";
+    private static final String LINE_DELIMITER_DEFAULT = "\n";
+    private static final String NULL_VALUE = "\\N";
+    private static final String ESCAPE_DELIMITERS_KEY = "escape_delimiters";
+    private static final String ESCAPE_DELIMITERS_DEFAULT = "false";
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
     @SuppressWarnings({"rawtypes"})
     private final Map<String, List> batchMap = new HashMap<>();
     private final Map<String, String> columnsMap = new HashMap<>();
@@ -92,10 +111,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private final DorisExecutionOptions executionOptions;
-    private final String databasePattern;
-    private final String tablePattern;
-    private final String dynamicSchemaFormat;
-    private final boolean ignoreSingleTableErrors;
     private final Map<String, Exception> flushExceptionMap = new HashMap<>();
     private final AtomicLong readInNum = new AtomicLong(0);
     private final AtomicLong writeOutNum = new AtomicLong(0);
@@ -103,6 +118,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private final AtomicLong ddlNum = new AtomicLong(0);
     private final String inlongMetric;
     private final String auditHostAndPorts;
+    private final boolean multipleSink;
+    private volatile String tableIdentifier;
+    private volatile String databasePattern;
+    private volatile String tablePattern;
+    private volatile String dynamicSchemaFormat;
+    private volatile boolean ignoreSingleTableErrors;
     private long batchBytes = 0L;
     private int size;
     private DorisStreamLoad dorisStreamLoad;
@@ -114,6 +135,13 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private transient SinkMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
+    private volatile String[] fieldNames;
+    private volatile boolean jsonFormat;
+    private String keysType;
+    private volatile RowData.FieldGetter[] fieldGetters;
+    private String fieldDelimiter;
+    private String lineDelimiter;
+    private volatile LogicalType[] logicalTypes;
 
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
             DorisReadOptions readOptions,
@@ -123,7 +151,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             String tablePattern,
             boolean ignoreSingleTableErrors,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            boolean multipleSink) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -131,10 +160,31 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+        this.multipleSink = multipleSink;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
     }
 
+    public DorisDynamicSchemaOutputFormat(DorisOptions option,
+            DorisReadOptions readOptions,
+            DorisExecutionOptions executionOptions,
+            String tableIdentifier,
+            LogicalType[] logicalTypes,
+            String[] fieldNames,
+            String inlongMetric,
+            String auditHostAndPorts,
+            boolean multipleSink) {
+        this.options = option;
+        this.readOptions = readOptions;
+        this.executionOptions = executionOptions;
+        this.tableIdentifier = tableIdentifier;
+        this.fieldNames = fieldNames;
+        this.multipleSink = multipleSink;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.logicalTypes = logicalTypes;
+    }
+
     /**
      * A builder used to set parameters to the output format's configuration in a fluent way.
      *
@@ -144,8 +194,48 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         return new DorisDynamicSchemaOutputFormat.Builder();
     }
 
+    private String parseKeysType() {
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return schema.getKeysType();
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier());
+        }
+    }
+
+    private void handleStreamLoadProp() {
+        Properties props = executionOptions.getStreamLoadProp();
+        boolean ifEscape = Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY, ESCAPE_DELIMITERS_DEFAULT));
+        this.fieldDelimiter = props.getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
+        this.lineDelimiter = props.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
+        if (ifEscape) {
+            this.fieldDelimiter = DorisParseUtils.escapeString(fieldDelimiter);
+            this.lineDelimiter = DorisParseUtils.escapeString(lineDelimiter);
+            props.remove(ESCAPE_DELIMITERS_KEY);
+        }
+
+        // add column key when fieldNames is not empty
+        if (!props.containsKey(COLUMNS_KEY) && fieldNames != null && fieldNames.length > 0) {
+            String columns = Arrays.stream(fieldNames)
+                    .map(item -> String.format("`%s`", item.trim().replace("`", "")))
+                    .collect(Collectors.joining(","));
+            props.put(COLUMNS_KEY, columns);
+        }
+
+        // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
+        String columns = (String) props.get(COLUMNS_KEY);
+        if (!columns.contains(DORIS_DELETE_SIGN) && enableBatchDelete()) {
+            props.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN));
+        }
+    }
+
     private boolean enableBatchDelete() {
-        return executionOptions.getEnableDelete();
+        try {
+            Schema schema = RestService.getSchema(options, readOptions, LOG);
+            return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
+        } catch (DorisException e) {
+            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier(), e);
+        }
     }
 
     @Override
@@ -154,13 +244,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        dorisStreamLoad = new DorisStreamLoad(
-                getBackend(),
-                options.getUsername(),
-                options.getPassword(),
-                executionOptions.getStreamLoadProp());
-        jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
-                DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+        Properties loadProps = executionOptions.getStreamLoadProp();
+        dorisStreamLoad = new DorisStreamLoad(getBackend(), options.getUsername(), options.getPassword(), loadProps);
+        if (!multipleSink) {
+            this.jsonFormat = true;
+            handleStreamLoadProp();
+            this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
+            for (int i = 0; i < logicalTypes.length; i++) {
+                fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i], i);
+            }
+        }
+
+        if (multipleSink && StringUtils.isNotBlank(dynamicSchemaFormat)) {
+            jsonDynamicSchemaFormat =
+                    (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+        }
         MetricOption metricOption = MetricOption.builder()
                 .withInlongLabels(inlongMetric)
                 .withInlongAudit(auditHostAndPorts)
@@ -183,14 +281,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     }
 
     private boolean checkFlushException(String tableIdentifier) {
-        Exception flushException = flushExceptionMap.get(tableIdentifier);
-        if (flushException == null) {
+        Exception ex = flushExceptionMap.get(tableIdentifier);
+        if (!multipleSink || ex == null) {
             return false;
         }
         if (!ignoreSingleTableErrors) {
-            throw new RuntimeException(
-                    String.format("Writing records to streamload of tableIdentifier:%s failed.", tableIdentifier),
-                    flushException);
+            throw new RuntimeException("Writing records to streamload failed, tableIdentifier=" + tableIdentifier, ex);
         }
         return true;
     }
@@ -205,8 +301,54 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         }
     }
 
+    public void addSingle(T row) {
+        if (row instanceof RowData) {
+            RowData rowData = (RowData) row;
+            Map<String, String> valueMap = new HashMap<>();
+            StringJoiner value = new StringJoiner(this.fieldDelimiter);
+            for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) {
+                Object field = fieldGetters[i].getFieldOrNull(rowData);
+                if (jsonFormat) {
+                    String data = field != null ? field.toString() : null;
+                    valueMap.put(this.fieldNames[i], data);
+                    batchBytes += this.fieldNames[i].getBytes(StandardCharsets.UTF_8).length;
+                    if (data != null) {
+                        batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
+                    }
+                } else {
+                    String data = field != null ? field.toString() : NULL_VALUE;
+                    value.add(data);
+                    batchBytes += data.getBytes(StandardCharsets.UTF_8).length;
+                }
+            }
+            // add doris delete sign
+            if (enableBatchDelete()) {
+                if (jsonFormat) {
+                    valueMap.put(DORIS_DELETE_SIGN, DorisParseUtils.parseDeleteSign(rowData.getRowKind()));
+                } else {
+                    value.add(DorisParseUtils.parseDeleteSign(rowData.getRowKind()));
+                }
+            }
+            Object data = jsonFormat ? valueMap : value.toString();
+            List mapData = batchMap.getOrDefault(tableIdentifier, new ArrayList<String>());
+            mapData.add(data);
+            batchMap.putIfAbsent(tableIdentifier, mapData);
+        } else if (row instanceof String) {
+            batchBytes += ((String) row).getBytes(StandardCharsets.UTF_8).length;
+            List mapData = batchMap.getOrDefault(tableIdentifier, new ArrayList<String>());
+            mapData.add(row);
+            batchMap.putIfAbsent(tableIdentifier, mapData);
+        } else {
+            throw new RuntimeException("The type of element should be 'RowData' or 'String' only.");
+        }
+    }
+
     @SuppressWarnings({"unchecked"})
     private void addBatch(T row) throws IOException {
+        if (!multipleSink) {
+            addSingle(row);
+            return;
+        }
         if (row instanceof RowData) {
             RowData rowData = (RowData) row;
             JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
@@ -273,7 +415,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                             break;
                         default:
                             errorNum.incrementAndGet();
-                            throw new RuntimeException("Unrecognized row kind:" + rowKind.toString());
+                            throw new RuntimeException("Unrecognized row kind:" + rowKind);
                     }
                 }
             }
@@ -359,6 +501,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             flushing = false;
             return;
         }
+
         for (Entry<String, List> kvs : batchMap.entrySet()) {
             flushSingleTable(kvs.getKey(), kvs.getValue());
         }
@@ -379,6 +522,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         if (checkFlushException(tableIdentifier) || values == null || values.isEmpty()) {
             return;
         }
+
         String loadValue = null;
         try {
             loadValue = OBJECT_MAPPER.writeValueAsString(values);
@@ -390,7 +534,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             } catch (Exception e) {
                 LOG.warn("metricData invoke get err:", e);
             }
-            LOG.info("load {} records to tableIdentifier: {}", values.size(), tableIdentifier);
             writeOutNum.addAndGet(values.size());
             // Clean the data that has been loaded.
             values.clear();
@@ -426,8 +569,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private RespContent load(String tableIdentifier, String result) throws IOException {
         String[] tableWithDb = tableIdentifier.split("\\.");
         RespContent respContent = null;
-        // Dynamic set COLUMNS_KEY for tableIdentifier every time
-        executionOptions.getStreamLoadProp().put(COLUMNS_KEY, columnsMap.get(tableIdentifier));
+        // Dynamic set COLUMNS_KEY for tableIdentifier every time for multiple sink scenario
+        if (multipleSink) {
+            executionOptions.getStreamLoadProp().put(COLUMNS_KEY, columnsMap.get(tableIdentifier));
+        }
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
                 respContent = dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
@@ -441,7 +586,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                     dorisStreamLoad.setHostPort(getBackend());
                     LOG.warn("streamload error,switch be: {}",
                             dorisStreamLoad.getLoadUrlStr(tableWithDb[0], tableWithDb[1]), e);
-                    Thread.sleep(1000 * i);
+                    Thread.sleep(1000L * i);
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
                     throw new IOException("unable to flush; interrupted while doing another attempt", e);
@@ -493,8 +638,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         private String databasePattern;
         private String tablePattern;
         private boolean ignoreSingleTableErrors;
+        private boolean multipleSink;
         private String inlongMetric;
         private String auditHostAndPorts;
+        private String tableIdentifier;
+        private DataType[] fieldDataTypes;
+        private String[] fieldNames;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -515,6 +664,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             return this;
         }
 
+        public DorisDynamicSchemaOutputFormat.Builder setTableIdentifier(String tableIdentifier) {
+            this.tableIdentifier = tableIdentifier;
+            return this;
+        }
+
+        public DorisDynamicSchemaOutputFormat.Builder setFieldDataTypes(DataType[] fieldDataTypes) {
+            this.fieldDataTypes = fieldDataTypes;
+            return this;
+        }
+
+        public DorisDynamicSchemaOutputFormat.Builder setFieldNames(String[] fieldNames) {
+            this.fieldNames = fieldNames;
+            return this;
+        }
+
         public DorisDynamicSchemaOutputFormat.Builder setReadOptions(DorisReadOptions readOptions) {
             this.readOptions = readOptions;
             return this;
@@ -546,6 +710,11 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             return this;
         }
 
+        public DorisDynamicSchemaOutputFormat.Builder setMultipleSink(boolean multipleSink) {
+            this.multipleSink = multipleSink;
+            return this;
+        }
+
         public DorisDynamicSchemaOutputFormat.Builder setInlongMetric(String inlongMetric) {
             this.inlongMetric = inlongMetric;
             return this;
@@ -558,10 +727,20 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
 
         @SuppressWarnings({"rawtypes"})
         public DorisDynamicSchemaOutputFormat build() {
+            if (!multipleSink) {
+                LogicalType[] logicalTypes = Arrays.stream(fieldDataTypes)
+                        .map(DataType::getLogicalType).toArray(LogicalType[]::new);
+
+                return new DorisDynamicSchemaOutputFormat(
+                        optionsBuilder.setTableIdentifier(tableIdentifier).build(), readOptions, executionOptions,
+                        tableIdentifier,
+                        logicalTypes, fieldNames,
+                        inlongMetric, auditHostAndPorts, multipleSink);
+            }
             return new DorisDynamicSchemaOutputFormat(
                     optionsBuilder.build(), readOptions, executionOptions,
                     dynamicSchemaFormat, databasePattern, tablePattern,
-                    ignoreSingleTableErrors, inlongMetric, auditHostAndPorts);
+                    ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, multipleSink);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index c6cdd4df6..76a53e659 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -17,23 +17,15 @@
 
 package org.apache.inlong.sort.doris.table;
 
-import java.util.Properties;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
-import org.apache.doris.flink.exception.DorisException;
-import org.apache.doris.flink.rest.RestService;
-import org.apache.doris.flink.rest.models.Schema;
-import org.apache.doris.flink.table.DorisDynamicOutputFormat;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * DorisDynamicTableSink copy from {@link org.apache.doris.flink.table.DorisDynamicTableSink}
@@ -41,8 +33,6 @@ import org.slf4j.LoggerFactory;
  **/
 public class DorisDynamicTableSink implements DynamicTableSink {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicTableSink.class);
-
     private final DorisOptions options;
     private final DorisReadOptions readOptions;
     private final DorisExecutionOptions executionOptions;
@@ -55,9 +45,6 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final Integer parallelism;
-    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
-    private static final String COLUMNS_KEY = "columns";
-    private static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
 
     public DorisDynamicTableSink(DorisOptions options,
             DorisReadOptions readOptions,
@@ -97,37 +84,25 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     @SuppressWarnings({"unchecked"})
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
-        if (!multipleSink) {
-            Properties loadProperties = executionOptions.getStreamLoadProp();
-            // if enable batch delete, the columns must add tag '__DORIS_DELETE_SIGN__'
-            String columns = (String) loadProperties.get(COLUMNS_KEY);
-            if (loadProperties.containsKey(COLUMNS_KEY) && !columns.contains(DORIS_DELETE_SIGN)
-                    && enableBatchDelete()) {
-                loadProperties.put(COLUMNS_KEY, String.format("%s,%s", columns, DORIS_DELETE_SIGN));
-            }
-            DorisDynamicOutputFormat.Builder builder = DorisDynamicOutputFormat.builder()
-                    .setFenodes(options.getFenodes())
-                    .setUsername(options.getUsername())
-                    .setPassword(options.getPassword())
-                    .setTableIdentifier(options.getTableIdentifier())
-                    .setReadOptions(readOptions)
-                    .setExecutionOptions(executionOptions)
-                    .setFieldDataTypes(tableSchema.getFieldDataTypes())
-                    .setFieldNames(tableSchema.getFieldNames());
-            return OutputFormatProvider.of(builder.build());
-        }
         DorisDynamicSchemaOutputFormat.Builder builder = DorisDynamicSchemaOutputFormat.builder()
                 .setFenodes(options.getFenodes())
                 .setUsername(options.getUsername())
                 .setPassword(options.getPassword())
                 .setReadOptions(readOptions)
                 .setExecutionOptions(executionOptions)
+                .setInlongMetric(inlongMetric)
+                .setAuditHostAndPorts(auditHostAndPorts)
+                .setTableIdentifier(options.getTableIdentifier())
+                .setFieldDataTypes(tableSchema.getFieldDataTypes())
+                .setFieldNames(tableSchema.getFieldNames())
+                .setInlongMetric(inlongMetric)
+                .setAuditHostAndPorts(auditHostAndPorts)
+                .setMultipleSink(multipleSink)
                 .setDatabasePattern(databasePattern)
                 .setTablePattern(tablePattern)
                 .setDynamicSchemaFormat(sinkMultipleFormat)
-                .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
-                .setInlongMetric(inlongMetric)
-                .setAuditHostAndPorts(auditHostAndPorts);
+                .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
+
         return SinkFunctionProvider.of(
                 new GenericDorisSinkFunction<>(builder.build()), parallelism);
     }
@@ -143,14 +118,5 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     public String asSummaryString() {
         return "Doris Table Sink Of InLong";
     }
-
-    private boolean enableBatchDelete() {
-        try {
-            Schema schema = RestService.getSchema(options, readOptions, LOG);
-            return executionOptions.getEnableDelete() || UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
-        } catch (DorisException e) {
-            throw new RuntimeException("Failed fetch doris table schema: " + options.getTableIdentifier(), e);
-        }
-    }
 }
 
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
new file mode 100644
index 000000000..81f82762d
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.util;
+
+import org.apache.flink.types.RowKind;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A utility class primarily serving DorisDynamicSchemaOutputFormat
+ */
+public class DorisParseUtils {
+
+    /**
+     * Pattern of escape mode for hexadecimal characters, such as "hi\\x33hi\\x44hello".
+     */
+    private static final Pattern HEX_PATTERN = Pattern.compile("\\\\x(\\d{2})");
+
+    /**
+     * A utility function used to determine the DORIS_DELETE_SIGN for a row change.
+     *
+     * @param rowKind the row change
+     * @return the doris delete sign corresponding to the change
+     */
+    public static String parseDeleteSign(RowKind rowKind) {
+        if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
+            return "0";
+        } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
+            return "1";
+        } else {
+            throw new RuntimeException("Unrecognized row kind: " + rowKind.toString());
+        }
+    }
+
+    /**
+     * A utility used to parse a string according to the given hexadecimal escape sequence.
+     * <p/>
+     * Example input: ""hi\\x33hi\\x44hello"" , where \x33 is '!', \x44 is ','
+     * Example output: "hi!hi,hello"
+     *
+     * @param hexStr hex string before parsing
+     * @return the parsed string
+     */
+    public static String escapeString(String hexStr) {
+        Matcher matcher = HEX_PATTERN.matcher(hexStr);
+        StringBuffer buf = new StringBuffer();
+        while (matcher.find()) {
+            matcher.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(matcher.group(1))));
+        }
+        matcher.appendTail(buf);
+
+        return buf.toString();
+    }
+
+}