You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/01 11:56:21 UTC

[inlong] branch master updated: [INLONG-6345][Sort] Add metrics for Doris connector (#6346)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c48e6fe6 [INLONG-6345][Sort] Add metrics for Doris connector (#6346)
5c48e6fe6 is described below

commit 5c48e6fe67fd635237e4ac1133cbf167a4427ec0
Author: kuansix <49...@qq.com>
AuthorDate: Tue Nov 1 19:56:15 2022 +0800

    [INLONG-6345][Sort] Add metrics for Doris connector (#6346)
---
 .../sort/protocol/node/load/DorisLoadNode.java     |  3 +-
 .../doris/internal/GenericDorisSinkFunction.java   | 78 ++++++++++++++++++
 .../inlong/sort/doris/model/RespContent.java       | 96 ++++++++++++++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      | 87 ++++++++++++++++++--
 .../sort/doris/table/DorisDynamicTableFactory.java | 10 ++-
 .../sort/doris/table/DorisDynamicTableSink.java    | 23 +++++-
 .../inlong/sort/doris/table/DorisStreamLoad.java   |  5 +-
 licenses/inlong-sort-connectors/LICENSE            |  1 +
 8 files changed, 290 insertions(+), 13 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
index 583970430..a281f24fe 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNode.java
@@ -27,6 +27,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
 import org.apache.inlong.sort.protocol.constant.DorisConstant;
 import org.apache.inlong.sort.protocol.enums.FilterStrategy;
 import org.apache.inlong.sort.protocol.node.LoadNode;
@@ -53,7 +54,7 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP
 @JsonInclude(Include.NON_NULL)
 @Data
 @NoArgsConstructor
-public class DorisLoadNode extends LoadNode implements Serializable {
+public class DorisLoadNode extends LoadNode implements InlongMetric, Serializable {
 
     private static final long serialVersionUID = -8002903269814211382L;
 
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java
new file mode 100644
index 000000000..74671fb37
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/internal/GenericDorisSinkFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.internal;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.doris.table.DorisDynamicSchemaOutputFormat;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * A generic SinkFunction for Doris.
+ *
+ * Add an option `inlong.metric` to support metrics.
+ */
+@Internal
+public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    private final DorisDynamicSchemaOutputFormat<T> outputFormat;
+
+    public GenericDorisSinkFunction(@Nonnull DorisDynamicSchemaOutputFormat<T> outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
+    }
+
+    @Override
+    public void invoke(T value, Context context) throws IOException {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        outputFormat.setRuntimeContext(getRuntimeContext());
+        outputFormat.initializeState(context);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        outputFormat.flush();
+        outputFormat.snapshotState(context);
+    }
+
+    @Override
+    public void close() throws IOException {
+        outputFormat.close();
+    }
+}
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
new file mode 100644
index 000000000..5a43f05aa
--- /dev/null
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.doris.model;
+
+import lombok.Data;
+import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.apache.doris.shaded.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * RespContent copy from {@link org.apache.doris.flink.rest.models.RespContent}
+ **/
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RespContent {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    @JsonProperty(value = "TxnId")
+    private int txnId;
+
+    @JsonProperty(value = "Label")
+    private String label;
+
+    @JsonProperty(value = "Status")
+    private String status;
+
+    @JsonProperty(value = "ExistingJobStatus")
+    private String existingJobStatus;
+
+    @JsonProperty(value = "Message")
+    private String message;
+
+    @JsonProperty(value = "NumberTotalRows")
+    private long numberTotalRows;
+
+    @JsonProperty(value = "NumberLoadedRows")
+    private long numberLoadedRows;
+
+    @JsonProperty(value = "NumberFilteredRows")
+    private int numberFilteredRows;
+
+    @JsonProperty(value = "NumberUnselectedRows")
+    private int numberUnselectedRows;
+
+    @JsonProperty(value = "LoadBytes")
+    private long loadBytes;
+
+    @JsonProperty(value = "LoadTimeMs")
+    private int loadTimeMs;
+
+    @JsonProperty(value = "BeginTxnTimeMs")
+    private int beginTxnTimeMs;
+
+    @JsonProperty(value = "StreamLoadPutTimeMs")
+    private int streamLoadPutTimeMs;
+
+    @JsonProperty(value = "ReadDataTimeMs")
+    private int readDataTimeMs;
+
+    @JsonProperty(value = "WriteDataTimeMs")
+    private int writeDataTimeMs;
+
+    @JsonProperty(value = "CommitAndPublishTimeMs")
+    private int commitAndPublishTimeMs;
+
+    @JsonProperty(value = "ErrorURL")
+    private String errorURL;
+
+    @Override
+    public String toString() {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(this);
+        } catch (JsonProcessingException e) {
+            return "";
+        }
+    }
+}
+
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 9e8142ae7..8e4100eb6 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -26,13 +26,24 @@ import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.doris.model.RespContent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +60,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat}
  * It is used in the multiple sink scenario, in this scenario, we directly convert the data format by
@@ -91,13 +106,21 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
     private transient ScheduledFuture<?> scheduledFuture;
     private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
 
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private transient SinkMetricData metricData;
+    private transient ListState<MetricState> metricStateListState;
+    private transient MetricState metricState;
+
     public DorisDynamicSchemaOutputFormat(DorisOptions option,
             DorisReadOptions readOptions,
             DorisExecutionOptions executionOptions,
             String dynamicSchemaFormat,
             String databasePattern,
             String tablePattern,
-            boolean ignoreSingleTableErrors) {
+            boolean ignoreSingleTableErrors,
+            String inlongMetric,
+            String auditHostAndPorts) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -105,6 +128,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
     }
 
     /**
@@ -133,6 +158,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 executionOptions.getStreamLoadProp());
         jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
                 DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withInlongAudit(auditHostAndPorts)
+                .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+                .build();
+        if (metricOption != null) {
+            metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup());
+        }
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
             this.scheduler = new ScheduledThreadPoolExecutor(1,
                     new ExecutorThreadFactory("doris-streamload-output-format"));
@@ -276,7 +311,14 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             String loadValue = null;
             try {
                 loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue());
-                load(kvs.getKey(), loadValue);
+                RespContent respContent = load(kvs.getKey(), loadValue);
+                try {
+                    if (null != metricData && null != respContent) {
+                        metricData.invoke(respContent.getNumberLoadedRows(), respContent.getLoadBytes());
+                    }
+                } catch (Exception e) {
+                    LOG.warn("metricData invoke get err:", e);
+                }
                 LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey());
                 writeOutNum.addAndGet(kvs.getValue().size());
                 // Clean the data that has been loaded.
@@ -320,11 +362,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         return hasRecords;
     }
 
-    private void load(String tableIdentifier, String result) throws IOException {
+    private RespContent load(String tableIdentifier, String result) throws IOException {
         String[] tableWithDb = tableIdentifier.split("\\.");
+        RespContent respContent = null;
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
-                dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
+                respContent = dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result);
                 break;
             } catch (StreamLoadException e) {
                 LOG.error("doris sink error, retry times = {}", i, e);
@@ -342,6 +385,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
                 }
             }
         }
+        return respContent;
     }
 
     private String getBackend() throws IOException {
@@ -354,6 +398,26 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         }
     }
 
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        if (metricData != null && metricStateListState != null) {
+            MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData,
+                    getRuntimeContext().getIndexOfThisSubtask());
+        }
+    }
+
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        if (this.inlongMetric != null) {
+            this.metricStateListState = context.getOperatorStateStore().getUnionListState(
+                    new ListStateDescriptor<>(
+                            INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() {
+                    })));
+        }
+        if (context.isRestored()) {
+            metricState = MetricStateUtils.restoreMetricState(metricStateListState,
+                    getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
+        }
+    }
+
     /**
      * Builder for {@link DorisDynamicSchemaOutputFormat}.
      */
@@ -366,6 +430,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
         private String databasePattern;
         private String tablePattern;
         private boolean ignoreSingleTableErrors;
+        private String inlongMetric;
+        private String auditHostAndPorts;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder().setTableIdentifier("");
@@ -417,11 +483,22 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> {
             return this;
         }
 
+        public DorisDynamicSchemaOutputFormat.Builder setInlongMetric(String inlongMetric) {
+            this.inlongMetric = inlongMetric;
+            return this;
+        }
+
+        public DorisDynamicSchemaOutputFormat.Builder setAuditHostAndPorts(String auditHostAndPorts) {
+            this.auditHostAndPorts = auditHostAndPorts;
+            return this;
+        }
+
         @SuppressWarnings({"rawtypes"})
         public DorisDynamicSchemaOutputFormat build() {
             return new DorisDynamicSchemaOutputFormat(
                     optionsBuilder.build(), readOptions, executionOptions,
-                    dynamicSchemaFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
+                    dynamicSchemaFormat, databasePattern, tablePattern,
+                    ignoreSingleTableErrors, inlongMetric, auditHostAndPorts);
         }
     }
 }
\ No newline at end of file
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
index 98c08c753..f3c263f93 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
@@ -59,6 +59,8 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 
 /**
  * This class copy from {@link org.apache.doris.flink.table.DorisDynamicTableFactory}
@@ -210,6 +212,9 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         options.add(SINK_MULTIPLE_TABLE_PATTERN);
         options.add(SINK_MULTIPLE_ENABLE);
         options.add(SINK_MULTIPLE_IGNORE_SINGLE_TABLE_ERRORS);
+        options.add(INLONG_METRIC);
+        options.add(INLONG_AUDIT);
+        options.add(FactoryUtil.SINK_PARALLELISM);
         return options;
     }
 
@@ -296,13 +301,16 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory
         String sinkMultipleFormat = helper.getOptions().getOptional(SINK_MULTIPLE_FORMAT).orElse(null);
         validateSinkMultiple(physicalSchema.toPhysicalRowDataType(),
                 multipleSink, sinkMultipleFormat, databasePattern, tablePattern);
+        String inlongMetric = helper.getOptions().getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+        String auditHostAndPorts = helper.getOptions().getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+        Integer parallelism = helper.getOptions().getOptional(FactoryUtil.SINK_PARALLELISM).orElse(1);
         // create and return dynamic table sink
         return new DorisDynamicTableSink(
                 getDorisOptions(helper.getOptions()),
                 getDorisReadOptions(helper.getOptions()),
                 getDorisExecutionOptions(helper.getOptions(), streamLoadProp),
                 physicalSchema, multipleSink, sinkMultipleFormat, databasePattern,
-                tablePattern, ignoreSingleTableErrors);
+                tablePattern, ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism);
     }
 
     private void validateSinkMultiple(DataType physicalDataType, boolean multipleSink, String sinkMultipleFormat,
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
index bc847a398..8d961fdb5 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
@@ -25,7 +25,9 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.OutputFormatProvider;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.doris.internal.GenericDorisSinkFunction;
 
 /**
  * DorisDynamicTableSink copy from {@link org.apache.doris.flink.table.DorisDynamicTableSink}
@@ -42,6 +44,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
     private final String databasePattern;
     private final String tablePattern;
     private final boolean ignoreSingleTableErrors;
+    private final String inlongMetric;
+    private final String auditHostAndPorts;
+    private final Integer parallelism;
 
     public DorisDynamicTableSink(DorisOptions options,
             DorisReadOptions readOptions,
@@ -51,7 +56,10 @@ public class DorisDynamicTableSink implements DynamicTableSink {
             String sinkMultipleFormat,
             String databasePattern,
             String tablePattern,
-            boolean ignoreSingleTableErrors) {
+            boolean ignoreSingleTableErrors,
+            String inlongMetric,
+            String auditHostAndPorts,
+            Integer parallelism) {
         this.options = options;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
@@ -61,6 +69,9 @@ public class DorisDynamicTableSink implements DynamicTableSink {
         this.databasePattern = databasePattern;
         this.tablePattern = tablePattern;
         this.ignoreSingleTableErrors = ignoreSingleTableErrors;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.parallelism = parallelism;
     }
 
     @Override
@@ -96,14 +107,18 @@ public class DorisDynamicTableSink implements DynamicTableSink {
                 .setDatabasePattern(databasePattern)
                 .setTablePattern(tablePattern)
                 .setDynamicSchemaFormat(sinkMultipleFormat)
-                .setIgnoreSingleTableErrors(ignoreSingleTableErrors);
-        return OutputFormatProvider.of(builder.build());
+                .setIgnoreSingleTableErrors(ignoreSingleTableErrors)
+                .setInlongMetric(inlongMetric)
+                .setAuditHostAndPorts(auditHostAndPorts);
+        return SinkFunctionProvider.of(
+                new GenericDorisSinkFunction<>(builder.build()), parallelism);
     }
 
     @Override
     public DynamicTableSink copy() {
         return new DorisDynamicTableSink(options, readOptions, executionOptions, tableSchema,
-                multipleSink, sinkMultipleFormat, databasePattern, tablePattern, ignoreSingleTableErrors);
+                multipleSink, sinkMultipleFormat, databasePattern, tablePattern,
+                ignoreSingleTableErrors, inlongMetric, auditHostAndPorts, parallelism);
     }
 
     @Override
diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
index 94b075ef5..1f688bebb 100644
--- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
@@ -19,7 +19,6 @@ package org.apache.inlong.sort.doris.table;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.exception.StreamLoadException;
-import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.shaded.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.doris.shaded.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.shaded.org.apache.commons.codec.binary.Base64;
@@ -32,6 +31,7 @@ import org.apache.http.impl.client.DefaultRedirectStrategy;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
+import org.apache.inlong.sort.doris.model.RespContent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +80,7 @@ public class DorisStreamLoad implements Serializable {
         this.httpClient = httpClientBuilder.build();
     }
 
-    public void load(String db, String tbl, String value) throws StreamLoadException {
+    public RespContent load(String db, String tbl, String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(db, tbl, value);
         LOG.info("Streamload Response:{}", loadResponse);
         if (loadResponse.status != 200) {
@@ -93,6 +93,7 @@ public class DorisStreamLoad implements Serializable {
                             respContent.getErrorURL());
                     throw new StreamLoadException(errMsg);
                 }
+                return respContent;
             } catch (IOException e) {
                 throw new StreamLoadException(e);
             }
diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE
index 53beba554..95793e251 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -608,6 +608,7 @@
          inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableFactory.java
          inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicTableSink.java
          inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisStreamLoad.java
+         inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/model/RespContent.java
     Source  : org.apache.doris:flink-doris-connector-1.13_2.11:1.0.3 (Please note that the software have been modified.)
     License : https://github.com/apache/doris-flink-connector/blob/1.13_2.11-1.0.3/LICENSE.txt