You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/08 06:08:48 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector-V2][Doris] Refactor some Doris Sink code as well as support 2pc and cdc (#4235)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7c4005af8 [Improve][Connector-V2][Doris] Refactor some Doris Sink code as well as support 2pc and cdc (#4235)
7c4005af8 is described below

commit 7c4005af85102db179c705ad1b5127dd619798c4
Author: yongkang.zhong <zh...@qq.com>
AuthorDate: Wed Mar 8 14:08:42 2023 +0800

    [Improve][Connector-V2][Doris] Refactor some Doris Sink code as well as support 2pc and cdc (#4235)
    
    * [Improve][Connector-V2][Doris]Refactor some Doris Sink code and support 2pc
    
    * fix package
    
    * code style
    
    * doc and test
    
    * doc style
    
    * Add cdc and override Serializer
    
    * fix
    
    * add test and fix code and fix doc
    
    * add doc
    
    * fix test
    
    * fix test2
    
    * fix test4
    
    * code style
    
    * fix test5
    
    * add abort
    
    * fix review
    
    * add release-note
    
    * fix
    
    * fix
    
    * fix
    
    * fix
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |   8 +-
 docs/en/connector-v2/sink/Doris.md                 | 113 ++--
 release-note.md                                    |   1 +
 seatunnel-connectors-v2/connector-doris/pom.xml    |   5 +
 .../connectors/doris/client/DorisSinkManager.java  | 172 ------
 .../doris/client/DorisStreamLoadVisitor.java       | 278 ---------
 .../connectors/doris/client/HttpHelper.java        | 180 ------
 .../connectors/doris/config/DorisConfig.java       | 333 ++++++++++
 .../connectors/doris/config/SinkConfig.java        | 216 -------
 .../doris/exception/DorisConnectorErrorCode.java   |   4 +-
 .../connectors/doris/rest/PartitionDefinition.java | 147 +++++
 .../connectors/doris/rest/RestService.java         | 680 +++++++++++++++++++++
 .../models/Backend.java}                           |  27 +-
 .../models/BackendRow.java}                        |  30 +-
 .../connectors/doris/rest/models/BackendV2.java    |  78 +++
 .../connectors/doris/rest/models/Field.java        | 134 ++++
 .../connectors/doris/rest/models/QueryPlan.java    |  70 +++
 .../models/RespContent.java}                       |  39 +-
 .../connectors/doris/rest/models/Schema.java       | 108 ++++
 .../connectors/doris/rest/models/Tablet.java       |  80 +++
 .../doris/serialize/DorisSerializer.java}          |  20 +-
 .../connectors/doris/serialize/DorisSinkOP.java}   |  20 +-
 .../doris/serialize/SeaTunnelRowConverter.java     |  73 +++
 .../doris/serialize/SeaTunnelRowSerializer.java    | 151 +++++
 .../seatunnel/connectors/doris/sink/DorisSink.java |  76 ++-
 .../connectors/doris/sink/DorisSinkFactory.java    |  54 --
 .../connectors/doris/sink/DorisSinkWriter.java     | 102 ----
 .../connectors/doris/sink/HttpPutBuilder.java      | 121 ++++
 .../DorisFlushTuple.java => sink/LoadStatus.java}  |  19 +-
 .../doris/sink/committer/DorisCommitInfo.java}     |  27 +-
 .../sink/committer/DorisCommitInfoSerializer.java  |  54 ++
 .../doris/sink/committer/DorisCommitter.java       | 179 ++++++
 .../doris/sink/writer/DorisSinkState.java}         |  23 +-
 .../sink/writer/DorisSinkStateSerializer.java      |  49 ++
 .../doris/sink/writer/DorisSinkWriter.java         | 273 +++++++++
 .../doris/sink/writer/DorisStreamLoad.java         | 301 +++++++++
 .../doris/sink/writer/LabelGenerator.java}         |  21 +-
 .../writer/LoadConstants.java}                     |  25 +-
 .../connectors/doris/sink/writer/RecordBuffer.java | 139 +++++
 .../connectors/doris/sink/writer/RecordStream.java |  60 ++
 .../connectors/doris/util/DelimiterParserUtil.java |  81 ---
 .../ErrorMessages.java}                            |  24 +-
 .../seatunnel/connectors/doris/util/HttpUtil.java  |  40 ++
 .../seatunnel/connectors/doris/util/IOUtils.java   |  49 ++
 .../ResponseUtil.java}                             |  34 +-
 .../connector-doris-e2e}/pom.xml                   |  42 +-
 .../e2e/connector/doris/DorisCDCSinkIT.java        | 178 ++++++
 .../resources/write-cdc-changelog-to-doris.conf    |  75 +++
 .../connectors/seatunnel/jdbc/JdbcDorisIT.java     |   8 +-
 .../src/test/resources/doris-jdbc-to-doris.conf    |  13 +-
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 51 files changed, 3703 insertions(+), 1332 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 9b54e555e..da0afa41d 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -161,9 +161,11 @@ problems encountered by users.
 
 ## Doris Connector Error Codes
 
-|   code   |           description            |                                                               solution                                                               |
-|----------|----------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|
-| Doris-01 | Writing records to Doris failed. | When users encounter this error code, it means that writing records to Doris failed, please check data from files whether is correct |
+|   code   |     description     |                                                             solution                                                              |
+|----------|---------------------|-----------------------------------------------------------------------------------------------------------------------------------|
+| Doris-01 | stream load error.  | When users encounter this error code, it means that stream load to Doris failed, please check data from files whether is correct. |
+| Doris-02 | commit error.       | When users encounter this error code, it means that commit to Doris failed, please check network.                                 |
+| Doris-03 | rest service error. | When users encounter this error code, it means that rest service failed, please check network and config.                         |
 
 ## SelectDB Cloud Connector Error Codes
 
diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md
index 47e893b22..acffad6a7 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -7,31 +7,37 @@
 Used to send data to Doris. Both support streaming and batch mode.
 The internal implementation of Doris sink connector is cached and imported by stream load in batches.
 
+:::tip
+
+Version Supported
+
+* exactly-once & cdc supported  `Doris version is >= 1.1.x`
+* Array data type supported  `Doris version is >= 1.2.x`
+* Map data type will be support in `Doris version is 2.x`
+
+:::
+
 ## Key features
 
-- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
 
 ## Options
 
-|            name             |  type  | required |  default value  |
-|-----------------------------|--------|----------|-----------------|
-| node_urls                   | list   | yes      | -               |
-| username                    | string | yes      | -               |
-| password                    | string | yes      | -               |
-| database                    | string | yes      | -               |
-| table                       | string | yes      | -               |
-| labelPrefix                 | string | no       | -               |
-| batch_max_rows              | long   | no       | 1024            |
-| batch_max_bytes             | int    | no       | 5 * 1024 * 1024 |
-| batch_interval_ms           | int    | no       | 1000            |
-| max_retries                 | int    | no       | 1               |
-| retry_backoff_multiplier_ms | int    | no       | -               |
-| max_retry_backoff_ms        | int    | no       | -               |
-| doris.config                | map    | no       | -               |
-
-### node_urls [list]
-
-`Doris` cluster address, the format is `["fe_ip:fe_http_port", ...]`
+|        name        |  type  | required | default value |
+|--------------------|--------|----------|---------------|
+| fenodes            | string | yes      | -             |
+| username           | string | yes      | -             |
+| password           | string | yes      | -             |
+| table.identifier   | string | yes      | -             |
+| sink.label-prefix  | string | yes      | -             |
+| sink.enable-2pc    | bool   | no       | true          |
+| sink.enable-delete | bool   | no       | false         |
+| doris.config       | map    | yes      | -             |
+
+### fenodes [string]
+
+`Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."`
 
 ### username [string]
 
@@ -41,47 +47,29 @@ The internal implementation of Doris sink connector is cached and imported by st
 
 `Doris` user password
 
-### database [string]
-
-The name of `Doris` database
-
-### table [string]
+### table.identifier [string]
 
 The name of `Doris` table
 
-### labelPrefix [string]
-
-The prefix of `Doris` stream load label
+### sink.label-prefix [string]
 
-### batch_max_rows [long]
+The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
 
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
+### sink.enable-2pc [bool]
 
-### batch_max_bytes [int]
+Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD).
 
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
+### sink.enable-delete [bool]
 
-### batch_interval_ms [int]
+Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link:
 
-For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the Doris
-
-### max_retries [int]
-
-The number of retries to flush failed
-
-### retry_backoff_multiplier_ms [int]
-
-Using as a multiplier for generating the next delay for backoff
-
-### max_retry_backoff_ms [int]
-
-The amount of time to wait before attempting to retry a request to `Doris`
+https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual
 
 ### doris.config [map]
 
 The parameter of the stream load `data_desc`, you can get more detail at this link:
 
-https://doris.apache.org/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD/
+https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD
 
 #### Supported import data formats
 
@@ -94,15 +82,15 @@ Use JSON format to import data
 ```
 sink {
     Doris {
-        nodeUrls = ["e2e_dorisdb:8030"]
+        fenodes = ["e2e_dorisdb:8030"]
         username = root
         password = ""
-        database = "test"
-        table = "e2e_table_sink"
-        batch_max_rows = 100
+        table.identifier = "test.e2e_table_sink"
+        sink.enable-2pc = "true"
+        sink.label-prefix = "test_json"
         doris.config = {
-          format = "JSON"
-          strip_outer_array = true
+            format="json"
+            read_json_by_line="true"
         }
     }
 }
@@ -114,16 +102,14 @@ Use CSV format to import data
 ```
 sink {
     Doris {
-        nodeUrls = ["e2e_dorisdb:8030"]
+        fenodes = ["e2e_dorisdb:8030"]
         username = root
         password = ""
-        database = "test"
-        table = "e2e_table_sink"
-        batch_max_rows = 100
-        sink.properties.format = "CSV"
-        sink.properties.column_separator = ","
+        table.identifier = "test.e2e_table_sink"
+        sink.enable-2pc = "true"
+        sink.label-prefix = "test_csv"
         doris.config = {
-          format = "CSV"
+          format = "csv"
           column_separator = ","
         }
     }
@@ -140,3 +126,10 @@ sink {
 
 - [Improve] Change Doris Config Prefix [3856](https://github.com/apache/incubator-seatunnel/pull/3856)
 
+- [Improve] Refactor some Doris Sink code as well as support 2pc and cdc [4235](https://github.com/apache/incubator-seatunnel/pull/4235)
+
+:::tip
+
+PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector
+
+:::
diff --git a/release-note.md b/release-note.md
index dcf38096c..36a7d3762 100644
--- a/release-note.md
+++ b/release-note.md
@@ -44,6 +44,7 @@
 - [File] Support column projection #4105
 - [Github] Add github source connector #4155
 - [Jdbc] Add database field to sink config #4199
+- [Doris] Refactor some Doris Sink code as well as support 2pc and cdc #4235
 ### Zeta Engine
 - [Chore] Remove unnecessary dependencies #3795
 - [Core] Improve job restart of all node down #3784
diff --git a/seatunnel-connectors-v2/connector-doris/pom.xml b/seatunnel-connectors-v2/connector-doris/pom.xml
index f06550674..b40a84925 100644
--- a/seatunnel-connectors-v2/connector-doris/pom.xml
+++ b/seatunnel-connectors-v2/connector-doris/pom.xml
@@ -65,5 +65,10 @@
             <artifactId>seatunnel-format-text</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>${commons-io.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java
deleted file mode 100644
index bccc7da9a..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-public class DorisSinkManager {
-
-    private final SinkConfig sinkConfig;
-    private final List<byte[]> batchList;
-
-    private final DorisStreamLoadVisitor dorisStreamLoadVisitor;
-    private ScheduledExecutorService scheduler;
-    private ScheduledFuture<?> scheduledFuture;
-    private volatile boolean initialize;
-    private volatile Exception flushException;
-    private int batchRowCount = 0;
-    private long batchBytesSize = 0;
-
-    private final Integer batchIntervalMs;
-
-    public DorisSinkManager(SinkConfig sinkConfig, List<String> fileNames) {
-        this.sinkConfig = sinkConfig;
-        this.batchList = new ArrayList<>();
-        this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
-        dorisStreamLoadVisitor = new DorisStreamLoadVisitor(sinkConfig, fileNames);
-    }
-
-    private void tryInit() throws IOException {
-        if (initialize) {
-            return;
-        }
-        initialize = true;
-
-        scheduler =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ThreadFactoryBuilder().setNameFormat("Doris-sink-output-%s").build());
-        scheduledFuture =
-                scheduler.scheduleAtFixedRate(
-                        () -> {
-                            try {
-                                flush();
-                            } catch (IOException e) {
-                                flushException = e;
-                            }
-                        },
-                        batchIntervalMs,
-                        batchIntervalMs,
-                        TimeUnit.MILLISECONDS);
-    }
-
-    public synchronized void write(String record) throws IOException {
-        tryInit();
-        checkFlushException();
-        byte[] bts = record.getBytes(StandardCharsets.UTF_8);
-        batchList.add(bts);
-        batchRowCount++;
-        batchBytesSize += bts.length;
-        if (batchRowCount >= sinkConfig.getBatchMaxSize()
-                || batchBytesSize >= sinkConfig.getBatchMaxBytes()) {
-            flush();
-        }
-    }
-
-    public synchronized void close() throws IOException {
-        if (scheduledFuture != null) {
-            scheduledFuture.cancel(false);
-            scheduler.shutdown();
-        }
-
-        flush();
-    }
-
-    public synchronized void flush() throws IOException {
-        checkFlushException();
-        if (batchList.isEmpty()) {
-            return;
-        }
-        String label = createBatchLabel();
-        DorisFlushTuple tuple = new DorisFlushTuple(label, batchBytesSize, batchList);
-        for (int i = 0; i <= sinkConfig.getMaxRetries(); i++) {
-            try {
-                Boolean successFlag = dorisStreamLoadVisitor.doStreamLoad(tuple);
-                if (successFlag) {
-                    break;
-                }
-            } catch (Exception e) {
-                log.warn("Writing records to Doris failed, retry times = {}", i, e);
-                if (i >= sinkConfig.getMaxRetries()) {
-                    throw new DorisConnectorException(
-                            DorisConnectorErrorCode.WRITE_RECORDS_FAILED,
-                            "The number of retries was exceeded,writing records to Doris failed.",
-                            e);
-                }
-
-                if (e instanceof DorisConnectorException
-                        && ((DorisConnectorException) e).needReCreateLabel()) {
-                    String newLabel = createBatchLabel();
-                    log.warn(
-                            String.format(
-                                    "Batch label changed from [%s] to [%s]",
-                                    tuple.getLabel(), newLabel));
-                    tuple.setLabel(newLabel);
-                }
-
-                try {
-                    long backoff =
-                            Math.min(
-                                    sinkConfig.getRetryBackoffMultiplierMs() * i,
-                                    sinkConfig.getMaxRetryBackoffMs());
-                    Thread.sleep(backoff);
-                } catch (InterruptedException ex) {
-                    Thread.currentThread().interrupt();
-                    throw new DorisConnectorException(
-                            CommonErrorCode.FLUSH_DATA_FAILED,
-                            "Unable to flush, interrupted while doing another attempt.",
-                            e);
-                }
-            }
-        }
-        batchList.clear();
-        batchRowCount = 0;
-        batchBytesSize = 0;
-    }
-
-    private void checkFlushException() {
-        if (flushException != null) {
-            throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, flushException);
-        }
-    }
-
-    public String createBatchLabel() {
-        String labelPrefix = "";
-        if (!Strings.isNullOrEmpty(sinkConfig.getLabelPrefix())) {
-            labelPrefix = sinkConfig.getLabelPrefix();
-        }
-        return String.format("%s%s", labelPrefix, UUID.randomUUID().toString());
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java
deleted file mode 100644
index 5535bee5d..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisStreamLoadVisitor.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.common.utils.JsonUtils;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil;
-
-import org.apache.commons.codec.binary.Base64;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class DorisStreamLoadVisitor {
-    private final HttpHelper httpHelper = new HttpHelper();
-    private static final int MAX_SLEEP_TIME = 5;
-
-    private final SinkConfig sinkConfig;
-    private long pos;
-    private static final String RESULT_FAILED = "Fail";
-    private static final String RESULT_SUCCESS = "Success";
-    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
-    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
-    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
-    private static final String RESULT_LABEL_PREPARE = "PREPARE";
-    private static final String RESULT_LABEL_ABORTED = "ABORTED";
-    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
-
-    private List<String> fieldNames;
-
-    public DorisStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) {
-        this.sinkConfig = sinkConfig;
-        this.fieldNames = fieldNames;
-    }
-
-    public Boolean doStreamLoad(DorisFlushTuple flushData) throws IOException {
-        String host = getAvailableHost();
-        if (null == host) {
-            throw new DorisConnectorException(
-                    CommonErrorCode.ILLEGAL_ARGUMENT,
-                    "None of the host in `load_url` could be connected.");
-        }
-        String loadUrl =
-                String.format(
-                        "%s/api/%s/%s/_stream_load",
-                        host, sinkConfig.getDatabase(), sinkConfig.getTable());
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    String.format(
-                            "Start to join batch data: rows[%d] bytes[%d] label[%s].",
-                            flushData.getRows().size(),
-                            flushData.getBytes(),
-                            flushData.getLabel()));
-        }
-        Map<String, Object> loadResult =
-                httpHelper.doHttpPut(
-                        loadUrl,
-                        joinRows(flushData.getRows(), flushData.getBytes().intValue()),
-                        getStreamLoadHttpHeader(flushData.getLabel()));
-        final String keyStatus = "Status";
-        if (null == loadResult || !loadResult.containsKey(keyStatus)) {
-            throw new DorisConnectorException(
-                    CommonErrorCode.FLUSH_DATA_FAILED,
-                    "Unable to flush data to Doris: unknown result status. " + loadResult);
-        }
-        if (log.isDebugEnabled()) {
-            log.debug(
-                    String.format("StreamLoad response:\n%s"), JsonUtils.toJsonString(loadResult));
-        }
-        if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
-            String errorMsg = "Failed to flush data to Doris.\n";
-            String message = "";
-            if (loadResult.containsKey("Message")) {
-                message = loadResult.get("Message") + "\n";
-            }
-            String errorURL = "";
-            if (loadResult.containsKey("ErrorURL")) {
-                try {
-                    errorURL = httpHelper.doHttpGet(loadResult.get("ErrorURL").toString()) + "\n";
-                } catch (IOException e) {
-                    log.warn("Get Error URL failed. {} ", loadResult.get("ErrorURL"), e);
-                }
-            } else {
-                errorURL = JsonUtils.toJsonString(loadResult) + "\n";
-            }
-            throw new DorisConnectorException(
-                    CommonErrorCode.FLUSH_DATA_FAILED,
-                    String.format("%s%s%s", errorMsg, message, errorURL));
-        } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
-            log.debug(
-                    String.format("StreamLoad response:\n%s"), JsonUtils.toJsonString(loadResult));
-            // has to block-checking the state to get the final result
-            checkLabelState(host, flushData.getLabel());
-        }
-        return RESULT_SUCCESS.equals(loadResult.get(keyStatus));
-    }
-
-    private String getAvailableHost() {
-        List<String> hostList = sinkConfig.getNodeUrls();
-        long tmp = pos + hostList.size();
-        for (; pos < tmp; pos++) {
-            String host = String.format("http://%s", hostList.get((int) (pos % hostList.size())));
-            if (httpHelper.tryHttpConnection(host)) {
-                return host;
-            }
-        }
-        return null;
-    }
-
-    private byte[] joinRows(List<byte[]> rows, int totalBytes) {
-        if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
-            Map<String, String> props = sinkConfig.getStreamLoadProps();
-            byte[] lineDelimiter =
-                    DelimiterParserUtil.parse((String) props.get("row_delimiter"), "\n")
-                            .getBytes(StandardCharsets.UTF_8);
-            ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
-            for (byte[] row : rows) {
-                bos.put(row);
-                bos.put(lineDelimiter);
-            }
-            return bos.array();
-        }
-
-        if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
-            ByteBuffer bos =
-                    ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
-            bos.put("[".getBytes(StandardCharsets.UTF_8));
-            byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
-            boolean isFirstElement = true;
-            for (byte[] row : rows) {
-                if (!isFirstElement) {
-                    bos.put(jsonDelimiter);
-                }
-                bos.put(row);
-                isFirstElement = false;
-            }
-            bos.put("]".getBytes(StandardCharsets.UTF_8));
-            return bos.array();
-        }
-        throw new DorisConnectorException(
-                CommonErrorCode.FLUSH_DATA_FAILED,
-                "Failed to join rows data, unsupported `format` from stream load properties:");
-    }
-
-    @SuppressWarnings("unchecked")
-    private void checkLabelState(String host, String label) throws IOException {
-        int idx = 0;
-        while (true) {
-            try {
-                TimeUnit.SECONDS.sleep(Math.min(++idx, MAX_SLEEP_TIME));
-            } catch (InterruptedException ex) {
-                break;
-            }
-            try {
-                String queryLoadStateUrl =
-                        String.format(
-                                "%s/api/%s/get_load_state?label=%s",
-                                host, sinkConfig.getDatabase(), label);
-                Map<String, Object> result =
-                        httpHelper.doHttpGet(queryLoadStateUrl, getLoadStateHttpHeader(label));
-                if (result == null) {
-                    throw new DorisConnectorException(
-                            CommonErrorCode.FLUSH_DATA_FAILED,
-                            String.format(
-                                    "Failed to flush data to Doris, Error "
-                                            + "could not get the final state of label[%s].\n",
-                                    label),
-                            null);
-                }
-                String labelState = (String) result.get("state");
-                if (null == labelState) {
-                    throw new DorisConnectorException(
-                            CommonErrorCode.FLUSH_DATA_FAILED,
-                            String.format(
-                                    "Failed to flush data to Doris, Error "
-                                            + "could not get the final state of label[%s]. response[%s]\n",
-                                    label, JsonUtils.toJsonString(result)),
-                            null);
-                }
-                log.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
-                switch (labelState) {
-                    case LAEBL_STATE_VISIBLE:
-                    case LAEBL_STATE_COMMITTED:
-                        return;
-                    case RESULT_LABEL_PREPARE:
-                        continue;
-                    case RESULT_LABEL_ABORTED:
-                        throw new DorisConnectorException(
-                                CommonErrorCode.FLUSH_DATA_FAILED,
-                                String.format(
-                                        "Failed to flush data to Doris, Error "
-                                                + "label[%s] state[%s]\n",
-                                        label, labelState),
-                                true);
-                    case RESULT_LABEL_UNKNOWN:
-                    default:
-                        throw new DorisConnectorException(
-                                CommonErrorCode.FLUSH_DATA_FAILED,
-                                String.format(
-                                        "Failed to flush data to Doris, Error "
-                                                + "label[%s] state[%s]\n",
-                                        label, labelState));
-                }
-            } catch (IOException e) {
-                throw new DorisConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, e);
-            }
-        }
-    }
-
-    private String getBasicAuthHeader(String username, String password) {
-        String auth = username + ":" + password;
-        byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
-        return String.format("Basic %s", new String(encodedAuth));
-    }
-
-    private Map<String, String> getStreamLoadHttpHeader(String label) {
-        Map<String, String> headerMap = new HashMap<>();
-        if (null != fieldNames
-                && !fieldNames.isEmpty()
-                && SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
-            headerMap.put(
-                    "columns",
-                    String.join(
-                            ",",
-                            fieldNames.stream()
-                                    .map(f -> String.format("`%s`", f))
-                                    .collect(Collectors.toList())));
-        }
-        if (null != sinkConfig.getStreamLoadProps()) {
-            for (Map.Entry<String, String> entry : sinkConfig.getStreamLoadProps().entrySet()) {
-                headerMap.put(entry.getKey(), String.valueOf(entry.getValue()));
-            }
-        }
-        headerMap.put("strip_outer_array", "true");
-        headerMap.put("Expect", "100-continue");
-        headerMap.put("label", label);
-        headerMap.put("Content-Type", "application/x-www-form-urlencoded");
-        headerMap.put(
-                "Authorization",
-                getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
-        return headerMap;
-    }
-
-    private Map<String, String> getLoadStateHttpHeader(String label) {
-        Map<String, String> headerMap = new HashMap<>();
-        headerMap.put(
-                "Authorization",
-                getBasicAuthHeader(sinkConfig.getUsername(), sinkConfig.getPassword()));
-        headerMap.put("Connection", "close");
-        return headerMap;
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java
deleted file mode 100644
index bf39f2c7c..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/HttpHelper.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.client;
-
-import org.apache.seatunnel.common.utils.JsonUtils;
-
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpRequestInterceptor;
-import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.methods.HttpPut;
-import org.apache.http.entity.ByteArrayEntity;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.DefaultRedirectStrategy;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.protocol.HTTP;
-import org.apache.http.util.EntityUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.HashMap;
-import java.util.Map;
-
-@Slf4j
-public class HttpHelper {
-    private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
-
-    public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
-        int code = resp.getStatusLine().getStatusCode();
-        if (HttpStatus.SC_OK != code) {
-            log.warn("Request failed with code:{}", code);
-            return null;
-        }
-        HttpEntity respEntity = resp.getEntity();
-        if (null == respEntity) {
-            log.warn("Request failed with empty response.");
-            return null;
-        }
-        return respEntity;
-    }
-
-    public String doHttpGet(String getUrl) throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = buildHttpClient()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = resp.getEntity();
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return EntityUtils.toString(respEntity);
-            }
-        }
-    }
-
-    public Map<String, Object> doHttpGet(String getUrl, Map<String, String> header)
-            throws IOException {
-        log.info("Executing GET from {}.", getUrl);
-        try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
-            HttpGet httpGet = new HttpGet(getUrl);
-            if (null != header) {
-                for (Map.Entry<String, String> entry : header.entrySet()) {
-                    httpGet.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
-                HttpEntity respEntity = getHttpEntity(resp);
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public Map<String, Object> doHttpPut(String url, byte[] data, Map<String, String> header)
-            throws IOException {
-        final HttpClientBuilder httpClientBuilder =
-                HttpClients.custom()
-                        .addInterceptorFirst(
-                                (HttpRequestInterceptor)
-                                        (request, context) -> {
-                                            // fighting org.apache.http.protocol.RequestContent's
-                                            // ProtocolException("Content-Length header already
-                                            // present");
-                                            request.removeHeaders(HTTP.CONTENT_LEN);
-                                        })
-                        .setRedirectStrategy(
-                                new DefaultRedirectStrategy() {
-                                    @Override
-                                    protected boolean isRedirectable(String method) {
-                                        return true;
-                                    }
-                                });
-        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
-            HttpPut httpPut = new HttpPut(url);
-            if (null != header) {
-                for (Map.Entry<String, String> entry : header.entrySet()) {
-                    httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
-                }
-            }
-            httpPut.setEntity(new ByteArrayEntity(data));
-            httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
-            try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
-                int code = resp.getStatusLine().getStatusCode();
-                if (HttpStatus.SC_OK != code) {
-                    String errorText;
-                    try {
-                        HttpEntity respEntity = resp.getEntity();
-                        errorText = EntityUtils.toString(respEntity);
-                    } catch (Exception err) {
-                        errorText = "find errorText failed: " + err.getMessage();
-                    }
-                    log.warn("Request failed with code:{}, err:{}", code, errorText);
-                    Map<String, Object> errorMap = new HashMap<>();
-                    errorMap.put("Status", "Fail");
-                    errorMap.put("Message", errorText);
-                    return errorMap;
-                }
-                HttpEntity respEntity = resp.getEntity();
-                if (null == respEntity) {
-                    log.warn("Request failed with empty response.");
-                    return null;
-                }
-                return JsonUtils.parseObject(EntityUtils.toString(respEntity), Map.class);
-            }
-        }
-    }
-
-    private CloseableHttpClient buildHttpClient() {
-        final HttpClientBuilder httpClientBuilder =
-                HttpClients.custom()
-                        .setRedirectStrategy(
-                                new DefaultRedirectStrategy() {
-                                    @Override
-                                    protected boolean isRedirectable(String method) {
-                                        return true;
-                                    }
-                                });
-        return httpClientBuilder.build();
-    }
-
-    public boolean tryHttpConnection(String host) {
-        try {
-            URL url = new URL(host);
-            HttpURLConnection co = (HttpURLConnection) url.openConnection();
-            co.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT);
-            co.connect();
-            co.disconnect();
-            return true;
-        } catch (Exception e1) {
-            log.warn("Failed to connect to address:{}", host, e1);
-            return false;
-        }
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
new file mode 100644
index 000000000..d58ea0757
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisConfig.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.Map;
+import java.util.Properties;
+
+@Setter
+@Getter
+@ToString
+public class DorisConfig {
+    public static final int DORIS_TABLET_SIZE_MIN = 1;
+    public static final int DORIS_TABLET_SIZE_DEFAULT = Integer.MAX_VALUE;
+    public static final int DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT = 30 * 1000;
+    public static final int DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT = 30 * 1000;
+    private static final int DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT = 3600;
+    public static final int DORIS_REQUEST_RETRIES_DEFAULT = 3;
+    private static final Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
+    private static final int DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;
+    private static final int DORIS_BATCH_SIZE_DEFAULT = 1024;
+    private static final long DORIS_EXEC_MEM_LIMIT_DEFAULT = 2147483648L;
+    private static final int DEFAULT_SINK_CHECK_INTERVAL = 10000;
+    private static final int DEFAULT_SINK_MAX_RETRIES = 3;
+    private static final int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
+    private static final int DEFAULT_SINK_BUFFER_COUNT = 3;
+    // common option
+    public static final Option<String> FENODES =
+            Options.key("fenodes")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("doris fe http address.");
+
+    public static final Option<String> TABLE_IDENTIFIER =
+            Options.key("table.identifier")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris table name.");
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris user name.");
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("the doris password.");
+
+    // source config options
+    public static final Option<String> DORIS_READ_FIELD =
+            Options.key("doris.read.field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "List of column names in the Doris table, separated by commas");
+    public static final Option<String> DORIS_FILTER_QUERY =
+            Options.key("doris.filter.query")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+    public static final Option<Integer> DORIS_TABLET_SIZE =
+            Options.key("doris.request.tablet.size")
+                    .intType()
+                    .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS =
+            Options.key("doris.request.connect.timeout.ms")
+                    .intType()
+                    .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_REQUEST_READ_TIMEOUT_MS =
+            Options.key("doris.request.read.timeout.ms")
+                    .intType()
+                    .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S =
+            Options.key("doris.request.query.timeout.s")
+                    .intType()
+                    .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_REQUEST_RETRIES =
+            Options.key("doris.request.retries")
+                    .intType()
+                    .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+                    .withDescription("");
+    public static final Option<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC =
+            Options.key("doris.deserialize.arrow.async")
+                    .booleanType()
+                    .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_DESERIALIZE_QUEUE_SIZE =
+            Options.key("doris.request.retriesdoris.deserialize.queue.size")
+                    .intType()
+                    .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+                    .withDescription("");
+    public static final Option<Integer> DORIS_BATCH_SIZE =
+            Options.key("doris.batch.size")
+                    .intType()
+                    .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+                    .withDescription("");
+    public static final Option<Long> DORIS_EXEC_MEM_LIMIT =
+            Options.key("doris.exec.mem.limit")
+                    .longType()
+                    .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+                    .withDescription("");
+    public static final Option<Boolean> SOURCE_USE_OLD_API =
+            Options.key("source.use-old-api")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to read data using the new interface defined according to the FLIP-27 specification,default false");
+
+    // sink config options
+    public static final Option<Boolean> SINK_ENABLE_2PC =
+            Options.key("sink.enable-2pc")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription("enable 2PC while loading");
+
+    public static final Option<Integer> SINK_CHECK_INTERVAL =
+            Options.key("sink.check-interval")
+                    .intType()
+                    .defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
+                    .withDescription("check exception with the interval while loading");
+    public static final Option<Integer> SINK_MAX_RETRIES =
+            Options.key("sink.max-retries")
+                    .intType()
+                    .defaultValue(DEFAULT_SINK_MAX_RETRIES)
+                    .withDescription("the max retry times if writing records to database failed.");
+    public static final Option<Integer> SINK_BUFFER_SIZE =
+            Options.key("sink.buffer-size")
+                    .intType()
+                    .defaultValue(DEFAULT_SINK_BUFFER_SIZE)
+                    .withDescription("the buffer size to cache data for stream load.");
+    public static final Option<Integer> SINK_BUFFER_COUNT =
+            Options.key("sink.buffer-count")
+                    .intType()
+                    .defaultValue(DEFAULT_SINK_BUFFER_COUNT)
+                    .withDescription("the buffer count to cache data for stream load.");
+    public static final Option<String> SINK_LABEL_PREFIX =
+            Options.key("sink.label-prefix")
+                    .stringType()
+                    .defaultValue("")
+                    .withDescription("the unique label prefix.");
+    public static final Option<Boolean> SINK_ENABLE_DELETE =
+            Options.key("sink.enable-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("whether to enable the delete function");
+
+    public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
+            Options.key("doris.config")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The parameter of the Stream Load data_desc. "
+                                    + "The way to specify the parameter is to add the prefix `doris.config` to the original load parameter name ");
+
+    // common option
+    private String frontends;
+    private String username;
+    private String password;
+    private String tableIdentifier;
+
+    // source option
+    private String readField;
+    private String filterQuery;
+    private Integer tabletSize;
+    private Integer requestConnectTimeoutMs;
+    private Integer requestReadTimeoutMs;
+    private Integer requestQueryTimeoutS;
+    private Integer requestRetries;
+    private boolean deserializeArrowAsync;
+    private int deserializeQueueSize;
+    private int batchSize;
+    private int execMemLimit;
+    private boolean useOldApi;
+
+    // sink option
+    private Boolean enable2PC;
+    private Boolean enableDelete;
+    private String labelPrefix;
+    private Integer checkInterval;
+    private Integer maxRetries;
+    private Integer bufferSize;
+    private Integer bufferCount;
+    private Properties streamLoadProps;
+
+    public static DorisConfig loadConfig(Config pluginConfig) {
+        DorisConfig dorisConfig = new DorisConfig();
+
+        // common option
+        dorisConfig.setFrontends(pluginConfig.getString(FENODES.key()));
+        dorisConfig.setUsername(pluginConfig.getString(USERNAME.key()));
+        dorisConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
+        dorisConfig.setTableIdentifier(pluginConfig.getString(TABLE_IDENTIFIER.key()));
+        dorisConfig.setStreamLoadProps(parseStreamLoadProperties(pluginConfig));
+
+        // source option
+        if (pluginConfig.hasPath(DORIS_READ_FIELD.key())) {
+            dorisConfig.setReadField(pluginConfig.getString(DORIS_READ_FIELD.key()));
+        } else {
+            dorisConfig.setReadField(DORIS_READ_FIELD.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_FILTER_QUERY.key())) {
+            dorisConfig.setFilterQuery(pluginConfig.getString(DORIS_FILTER_QUERY.key()));
+        } else {
+            dorisConfig.setFilterQuery(DORIS_FILTER_QUERY.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_TABLET_SIZE.key())) {
+            dorisConfig.setTabletSize(pluginConfig.getInt(DORIS_TABLET_SIZE.key()));
+        } else {
+            dorisConfig.setTabletSize(DORIS_TABLET_SIZE.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_REQUEST_CONNECT_TIMEOUT_MS.key())) {
+            dorisConfig.setRequestReadTimeoutMs(
+                    pluginConfig.getInt(DORIS_REQUEST_CONNECT_TIMEOUT_MS.key()));
+        } else {
+            dorisConfig.setRequestReadTimeoutMs(DORIS_REQUEST_CONNECT_TIMEOUT_MS.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_REQUEST_QUERY_TIMEOUT_S.key())) {
+            dorisConfig.setRequestQueryTimeoutS(
+                    pluginConfig.getInt(DORIS_REQUEST_QUERY_TIMEOUT_S.key()));
+        } else {
+            dorisConfig.setRequestQueryTimeoutS(DORIS_REQUEST_QUERY_TIMEOUT_S.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_REQUEST_READ_TIMEOUT_MS.key())) {
+            dorisConfig.setRequestReadTimeoutMs(
+                    pluginConfig.getInt(DORIS_REQUEST_READ_TIMEOUT_MS.key()));
+        } else {
+            dorisConfig.setRequestReadTimeoutMs(DORIS_REQUEST_READ_TIMEOUT_MS.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_REQUEST_RETRIES.key())) {
+            dorisConfig.setRequestRetries(pluginConfig.getInt(DORIS_REQUEST_RETRIES.key()));
+        } else {
+            dorisConfig.setRequestRetries(DORIS_REQUEST_RETRIES.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_DESERIALIZE_ARROW_ASYNC.key())) {
+            dorisConfig.setDeserializeArrowAsync(
+                    pluginConfig.getBoolean(DORIS_DESERIALIZE_ARROW_ASYNC.key()));
+        } else {
+            dorisConfig.setDeserializeArrowAsync(DORIS_DESERIALIZE_ARROW_ASYNC.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_DESERIALIZE_QUEUE_SIZE.key())) {
+            dorisConfig.setDeserializeQueueSize(
+                    pluginConfig.getInt(DORIS_DESERIALIZE_QUEUE_SIZE.key()));
+        } else {
+            dorisConfig.setDeserializeQueueSize(DORIS_DESERIALIZE_QUEUE_SIZE.defaultValue());
+        }
+        if (pluginConfig.hasPath(DORIS_BATCH_SIZE.key())) {
+            dorisConfig.setDeserializeQueueSize(pluginConfig.getInt(DORIS_BATCH_SIZE.key()));
+        } else {
+            dorisConfig.setDeserializeQueueSize(DORIS_BATCH_SIZE.defaultValue());
+        }
+        // sink option
+        if (pluginConfig.hasPath(SINK_ENABLE_2PC.key())) {
+            dorisConfig.setEnable2PC(pluginConfig.getBoolean(SINK_ENABLE_2PC.key()));
+        } else {
+            dorisConfig.setEnable2PC(SINK_ENABLE_2PC.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_LABEL_PREFIX.key())) {
+            dorisConfig.setLabelPrefix(pluginConfig.getString(SINK_LABEL_PREFIX.key()));
+        } else {
+            dorisConfig.setLabelPrefix(SINK_LABEL_PREFIX.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_CHECK_INTERVAL.key())) {
+            dorisConfig.setCheckInterval(pluginConfig.getInt(SINK_CHECK_INTERVAL.key()));
+        } else {
+            dorisConfig.setCheckInterval(SINK_CHECK_INTERVAL.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_MAX_RETRIES.key())) {
+            dorisConfig.setMaxRetries(pluginConfig.getInt(SINK_MAX_RETRIES.key()));
+        } else {
+            dorisConfig.setMaxRetries(SINK_MAX_RETRIES.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_BUFFER_SIZE.key())) {
+            dorisConfig.setBufferSize(pluginConfig.getInt(SINK_BUFFER_SIZE.key()));
+        } else {
+            dorisConfig.setBufferSize(SINK_BUFFER_SIZE.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_BUFFER_COUNT.key())) {
+            dorisConfig.setBufferCount(pluginConfig.getInt(SINK_BUFFER_COUNT.key()));
+        } else {
+            dorisConfig.setBufferCount(SINK_BUFFER_COUNT.defaultValue());
+        }
+        if (pluginConfig.hasPath(SINK_ENABLE_DELETE.key())) {
+            dorisConfig.setEnableDelete(pluginConfig.getBoolean(SINK_ENABLE_DELETE.key()));
+        } else {
+            dorisConfig.setEnableDelete(SINK_ENABLE_DELETE.defaultValue());
+        }
+        return dorisConfig;
+    }
+
+    private static Properties parseStreamLoadProperties(Config pluginConfig) {
+        Properties streamLoadProps = new Properties();
+        if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_SINK_CONFIG_PREFIX.key())) {
+            pluginConfig
+                    .getObject(DORIS_SINK_CONFIG_PREFIX.key())
+                    .forEach(
+                            (key, value) -> {
+                                final String configKey = key.toLowerCase();
+                                streamLoadProps.put(configKey, value.unwrapped().toString());
+                            });
+        }
+        return streamLoadProps;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
deleted file mode 100644
index 1975642a8..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/SinkConfig.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.config;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-
-import lombok.Data;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class SinkConfig {
-
-    private static final int DEFAULT_BATCH_MAX_SIZE = 1024;
-    private static final int DEFAULT_BATCH_INTERVAL_MS = 1000;
-    private static final long DEFAULT_BATCH_BYTES = 5 * 1024 * 1024;
-
-    private static final String LOAD_FORMAT = "format";
-    private static final StreamLoadFormat DEFAULT_LOAD_FORMAT = StreamLoadFormat.CSV;
-    private static final String COLUMN_SEPARATOR = "column_separator";
-
-    public static final Option<List<String>> NODE_URLS =
-            Options.key("nodeUrls")
-                    .listType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Doris cluster address, the format is [\"fe_ip:fe_http_port\", ...]");
-
-    public static final Option<String> USERNAME =
-            Options.key("username")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Doris user username");
-
-    public static final Option<String> PASSWORD =
-            Options.key("password")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("Doris user password");
-
-    public static final Option<String> LABEL_PREFIX =
-            Options.key("labelPrefix")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The prefix of Doris stream load label");
-
-    public static final Option<String> DATABASE =
-            Options.key("database")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The name of Doris database");
-
-    public static final Option<String> TABLE =
-            Options.key("table")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The name of Doris table");
-
-    public static final Option<Map<String, String>> DORIS_CONFIG =
-            Options.key("doris.config")
-                    .mapType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The parameter of the stream load data_desc. "
-                                    + "The way to specify the parameter is to add the original stream load parameter into map");
-
-    public static final Option<Integer> BATCH_MAX_SIZE =
-            Options.key("batch_max_rows")
-                    .intType()
-                    .defaultValue(DEFAULT_BATCH_MAX_SIZE)
-                    .withDescription(
-                            "For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris");
-
-    public static final Option<Long> BATCH_MAX_BYTES =
-            Options.key("batch_max_bytes")
-                    .longType()
-                    .defaultValue(DEFAULT_BATCH_BYTES)
-                    .withDescription(
-                            "For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris");
-
-    public static final Option<Integer> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .intType()
-                    .defaultValue(DEFAULT_BATCH_INTERVAL_MS)
-                    .withDescription(
-                            "For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the Doris");
-
-    public static final Option<Integer> MAX_RETRIES =
-            Options.key("max_retries")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription("The number of retries to flush failed");
-
-    public static final Option<Integer> RETRY_BACKOFF_MULTIPLIER_MS =
-            Options.key("retry_backoff_multiplier_ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Using as a multiplier for generating the next delay for backoff");
-
-    public static final Option<Integer> MAX_RETRY_BACKOFF_MS =
-            Options.key("max_retry_backoff_ms")
-                    .intType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "The amount of time to wait before attempting to retry a request to Doris");
-
-    public enum StreamLoadFormat {
-        CSV,
-        JSON;
-
-        public static StreamLoadFormat parse(String format) {
-            if (StreamLoadFormat.JSON.name().equals(format)) {
-                return JSON;
-            }
-            return CSV;
-        }
-    }
-
-    private List<String> nodeUrls;
-    private String username;
-    private String password;
-    private String database;
-    private String table;
-    private String labelPrefix;
-    private String columnSeparator;
-    private StreamLoadFormat loadFormat = DEFAULT_LOAD_FORMAT;
-
-    private int batchMaxSize = DEFAULT_BATCH_MAX_SIZE;
-    private long batchMaxBytes = DEFAULT_BATCH_BYTES;
-    private int batchIntervalMs = DEFAULT_BATCH_INTERVAL_MS;
-    private int maxRetries;
-    private int retryBackoffMultiplierMs;
-    private int maxRetryBackoffMs;
-
-    private final Map<String, String> streamLoadProps = new HashMap<>();
-
-    public static SinkConfig loadConfig(Config pluginConfig) {
-        SinkConfig sinkConfig = new SinkConfig();
-        sinkConfig.setNodeUrls(pluginConfig.getStringList(NODE_URLS.key()));
-        sinkConfig.setDatabase(pluginConfig.getString(DATABASE.key()));
-        sinkConfig.setTable(pluginConfig.getString(TABLE.key()));
-
-        if (pluginConfig.hasPath(USERNAME.key())) {
-            sinkConfig.setUsername(pluginConfig.getString(USERNAME.key()));
-        }
-        if (pluginConfig.hasPath(PASSWORD.key())) {
-            sinkConfig.setPassword(pluginConfig.getString(PASSWORD.key()));
-        }
-        if (pluginConfig.hasPath(LABEL_PREFIX.key())) {
-            sinkConfig.setLabelPrefix(pluginConfig.getString(LABEL_PREFIX.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_MAX_SIZE.key())) {
-            sinkConfig.setBatchMaxSize(pluginConfig.getInt(BATCH_MAX_SIZE.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_MAX_BYTES.key())) {
-            sinkConfig.setBatchMaxBytes(pluginConfig.getLong(BATCH_MAX_BYTES.key()));
-        }
-        if (pluginConfig.hasPath(BATCH_INTERVAL_MS.key())) {
-            sinkConfig.setBatchIntervalMs(pluginConfig.getInt(BATCH_INTERVAL_MS.key()));
-        }
-        if (pluginConfig.hasPath(MAX_RETRIES.key())) {
-            sinkConfig.setMaxRetries(pluginConfig.getInt(MAX_RETRIES.key()));
-        }
-        if (pluginConfig.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
-            sinkConfig.setRetryBackoffMultiplierMs(
-                    pluginConfig.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
-        }
-        if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
-            sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
-        }
-        parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
-        if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
-            sinkConfig.setColumnSeparator(sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
-        }
-        if (sinkConfig.streamLoadProps.containsKey(LOAD_FORMAT)) {
-            sinkConfig.setLoadFormat(
-                    StreamLoadFormat.parse(sinkConfig.streamLoadProps.get(LOAD_FORMAT)));
-        }
-        return sinkConfig;
-    }
-
-    private static void parseSinkStreamLoadProperties(Config pluginConfig, SinkConfig sinkConfig) {
-        if (CheckConfigUtil.isValidParam(pluginConfig, DORIS_CONFIG.key())) {
-            pluginConfig
-                    .getObject(DORIS_CONFIG.key())
-                    .forEach(
-                            (key, value) -> {
-                                final String configKey = key.toLowerCase();
-                                sinkConfig.streamLoadProps.put(
-                                        configKey, value.unwrapped().toString());
-                            });
-        }
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java
index b049a01c2..85bd8f55f 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.doris.exception;
 import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
 
 public enum DorisConnectorErrorCode implements SeaTunnelErrorCode {
-    WRITE_RECORDS_FAILED("DORIS-01", "Writing records to Doris failed.");
+    STREAM_LOAD_FAILED("Doris-01", "stream load error"),
+    COMMIT_FAILED("Doris-02", "commit error"),
+    REST_SERVICE_FAILED("Doris-03", "rest service error");
 
     private final String code;
     private final String description;
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
new file mode 100644
index 000000000..884cd7bde
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/PartitionDefinition.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.seatunnel.connectors.doris.rest;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+/** Doris partition info. */
+public class PartitionDefinition implements Serializable, Comparable<PartitionDefinition> {
+    private final String database;
+    private final String table;
+
+    private final String beAddress;
+    private final Set<Long> tabletIds;
+    private final String queryPlan;
+
+    public PartitionDefinition(
+            String database, String table, String beAddress, Set<Long> tabletIds, String queryPlan)
+            throws IllegalArgumentException {
+        this.database = database;
+        this.table = table;
+        this.beAddress = beAddress;
+        this.tabletIds = tabletIds;
+        this.queryPlan = queryPlan;
+    }
+
+    public String getBeAddress() {
+        return beAddress;
+    }
+
+    public Set<Long> getTabletIds() {
+        return tabletIds;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public String getQueryPlan() {
+        return queryPlan;
+    }
+
+    @Override
+    public int compareTo(PartitionDefinition o) {
+        int cmp = database.compareTo(o.database);
+        if (cmp != 0) {
+            return cmp;
+        }
+        cmp = table.compareTo(o.table);
+        if (cmp != 0) {
+            return cmp;
+        }
+        cmp = beAddress.compareTo(o.beAddress);
+        if (cmp != 0) {
+            return cmp;
+        }
+        cmp = queryPlan.compareTo(o.queryPlan);
+        if (cmp != 0) {
+            return cmp;
+        }
+
+        cmp = tabletIds.size() - o.tabletIds.size();
+        if (cmp != 0) {
+            return cmp;
+        }
+
+        Set<Long> similar = new HashSet<>(tabletIds);
+        Set<Long> diffSelf = new HashSet<>(tabletIds);
+        Set<Long> diffOther = new HashSet<>(o.tabletIds);
+        similar.retainAll(o.tabletIds);
+        diffSelf.removeAll(similar);
+        diffOther.removeAll(similar);
+        if (diffSelf.size() == 0) {
+            return 0;
+        }
+        long diff = Collections.min(diffSelf) - Collections.min(diffOther);
+        return diff < 0 ? -1 : 1;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionDefinition that = (PartitionDefinition) o;
+        return Objects.equals(database, that.database)
+                && Objects.equals(table, that.table)
+                && Objects.equals(beAddress, that.beAddress)
+                && Objects.equals(tabletIds, that.tabletIds)
+                && Objects.equals(queryPlan, that.queryPlan);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = database.hashCode();
+        result = 31 * result + table.hashCode();
+        result = 31 * result + beAddress.hashCode();
+        result = 31 * result + queryPlan.hashCode();
+        result = 31 * result + tabletIds.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionDefinition{"
+                + ", database='"
+                + database
+                + '\''
+                + ", table='"
+                + table
+                + '\''
+                + ", beAddress='"
+                + beAddress
+                + '\''
+                + ", tabletIds="
+                + tabletIds
+                + ", queryPlan='"
+                + queryPlan
+                + '\''
+                + '}';
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
new file mode 100644
index 000000000..ee824d024
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -0,0 +1,680 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest;
+
+import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+import org.apache.seatunnel.connectors.doris.rest.models.Backend;
+import org.apache.seatunnel.connectors.doris.rest.models.BackendRow;
+import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
+import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan;
+import org.apache.seatunnel.connectors.doris.rest.models.Schema;
+import org.apache.seatunnel.connectors.doris.rest.models.Tablet;
+import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
+
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class RestService implements Serializable {
+    public static final int REST_RESPONSE_STATUS_OK = 200;
+    public static final int REST_RESPONSE_CODE_OK = 0;
+    private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
+    private static final String API_PREFIX = "/api";
+    private static final String SCHEMA = "_schema";
+    private static final String QUERY_PLAN = "_query_plan";
+    private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
+    @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends";
+    private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
+    private static final String FE_LOGIN = "/rest/v1/login";
+    private static final String BASE_URL = "http://%s%s";
+
+    private static String send(DorisConfig dorisConfig, HttpRequestBase request, Logger logger)
+            throws DorisConnectorException {
+        int connectTimeout =
+                dorisConfig.getRequestConnectTimeoutMs() == null
+                        ? DorisConfig.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT
+                        : dorisConfig.getRequestConnectTimeoutMs();
+        int socketTimeout =
+                dorisConfig.getRequestReadTimeoutMs() == null
+                        ? DorisConfig.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT
+                        : dorisConfig.getRequestReadTimeoutMs();
+        int retries =
+                dorisConfig.getRequestRetries() == null
+                        ? DorisConfig.DORIS_REQUEST_RETRIES_DEFAULT
+                        : dorisConfig.getRequestRetries();
+        logger.trace(
+                "connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
+                connectTimeout,
+                socketTimeout,
+                retries);
+
+        RequestConfig requestConfig =
+                RequestConfig.custom()
+                        .setConnectTimeout(connectTimeout)
+                        .setSocketTimeout(socketTimeout)
+                        .build();
+
+        request.setConfig(requestConfig);
+        logger.info(
+                "Send request to Doris FE '{}' with user '{}'.",
+                request.getURI(),
+                dorisConfig.getUsername());
+        IOException ex = null;
+        int statusCode = -1;
+
+        for (int attempt = 0; attempt < retries; attempt++) {
+            logger.debug("Attempt {} to request {}.", attempt, request.getURI());
+            try {
+                String response;
+                if (request instanceof HttpGet) {
+                    response =
+                            getConnectionGet(
+                                    request.getURI().toString(),
+                                    dorisConfig.getUsername(),
+                                    dorisConfig.getPassword(),
+                                    logger);
+                } else {
+                    response =
+                            getConnectionPost(
+                                    request,
+                                    dorisConfig.getUsername(),
+                                    dorisConfig.getPassword(),
+                                    logger);
+                }
+                if (response == null) {
+                    logger.warn(
+                            "Failed to get response from Doris FE {}, http code is {}",
+                            request.getURI(),
+                            statusCode);
+                    continue;
+                }
+                logger.trace(
+                        "Success get response from Doris FE: {}, response is: {}.",
+                        request.getURI(),
+                        response);
+                // Handle the problem of inconsistent data format returned by http v1 and v2
+                ObjectMapper mapper = new ObjectMapper();
+                Map map = mapper.readValue(response, Map.class);
+                if (map.containsKey("code") && map.containsKey("msg")) {
+                    Object data = map.get("data");
+                    return mapper.writeValueAsString(data);
+                } else {
+                    return response;
+                }
+            } catch (IOException e) {
+                ex = e;
+                logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, request.getURI(), e);
+            }
+        }
+        String errMsg =
+                "Connect to "
+                        + request.getURI().toString()
+                        + "failed, status code is "
+                        + statusCode
+                        + ".";
+        throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, ex);
+    }
+
+    private static String getConnectionPost(
+            HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
+        URL url = new URL(request.getURI().toString());
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setInstanceFollowRedirects(false);
+        conn.setRequestMethod(request.getMethod());
+        String authEncoding =
+                Base64.getEncoder()
+                        .encodeToString(
+                                String.format("%s:%s", user, passwd)
+                                        .getBytes(StandardCharsets.UTF_8));
+        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+        InputStream content = ((HttpPost) request).getEntity().getContent();
+        String res = IOUtils.toString(content);
+        conn.setDoOutput(true);
+        conn.setDoInput(true);
+        PrintWriter out = new PrintWriter(conn.getOutputStream());
+        // send request params
+        out.print(res);
+        // flush
+        out.flush();
+        // read response
+        return parseResponse(conn, logger);
+    }
+
+    private static String getConnectionGet(
+            String request, String user, String passwd, Logger logger) throws IOException {
+        URL realUrl = new URL(request);
+        // open connection
+        HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
+        String authEncoding =
+                Base64.getEncoder()
+                        .encodeToString(
+                                String.format("%s:%s", user, passwd)
+                                        .getBytes(StandardCharsets.UTF_8));
+        connection.setRequestProperty("Authorization", "Basic " + authEncoding);
+
+        connection.connect();
+        return parseResponse(connection, logger);
+    }
+
+    private static String parseResponse(HttpURLConnection connection, Logger logger)
+            throws IOException {
+        if (connection.getResponseCode() != HttpStatus.SC_OK) {
+            logger.warn(
+                    "Failed to get response from Doris  {}, http code is {}",
+                    connection.getURL(),
+                    connection.getResponseCode());
+            throw new IOException("Failed to get response from Doris");
+        }
+        StringBuilder result = new StringBuilder();
+        BufferedReader in = null;
+        try {
+            in =
+                    new BufferedReader(
+                            new InputStreamReader(
+                                    connection.getInputStream(), StandardCharsets.UTF_8));
+            String line;
+            while ((line = in.readLine()) != null) {
+                result.append(line);
+            }
+        } catch (IOException e) {
+            throw new IOException(e);
+        } finally {
+            in.close();
+        }
+        return result.toString();
+    }
+
+    @VisibleForTesting
+    static String[] parseIdentifier(String tableIdentifier, Logger logger)
+            throws DorisConnectorException {
+        logger.trace("Parse identifier '{}'.", tableIdentifier);
+        if (StringUtils.isEmpty(tableIdentifier)) {
+            String errMsg =
+                    String.format(
+                            ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
+                            "table.identifier",
+                            tableIdentifier);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        String[] identifier = tableIdentifier.split("\\.");
+        if (identifier.length != 2) {
+            String errMsg =
+                    String.format(
+                            ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
+                            "table.identifier",
+                            tableIdentifier);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        return identifier;
+    }
+
+    @VisibleForTesting
+    static String randomEndpoint(String feNodes, Logger logger) throws DorisConnectorException {
+        logger.trace("Parse fenodes '{}'.", feNodes);
+        if (StringUtils.isEmpty(feNodes)) {
+            String errMsg =
+                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        List<String> nodes = Arrays.asList(feNodes.split(","));
+        Collections.shuffle(nodes);
+        return nodes.get(0).trim();
+    }
+
+    @VisibleForTesting
+    static List<String> allEndpoints(String feNodes, Logger logger) throws DorisConnectorException {
+        logger.trace("Parse fenodes '{}'.", feNodes);
+        if (StringUtils.isEmpty(feNodes)) {
+            String errMsg =
+                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        List<String> nodes =
+                Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
+        Collections.shuffle(nodes);
+        return nodes;
+    }
+
+    @VisibleForTesting
+    public static String randomBackend(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException, IOException {
+        List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig, logger);
+        logger.trace("Parse beNodes '{}'.", backends);
+        if (backends == null || backends.isEmpty()) {
+            logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+            String errMsg =
+                    String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        Collections.shuffle(backends);
+        BackendV2.BackendRowV2 backend = backends.get(0);
+        return backend.getIp() + ":" + backend.getHttpPort();
+    }
+
+    public static String getBackend(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException {
+        try {
+            return randomBackend(dorisConfig, logger);
+        } catch (Exception e) {
+            String errMsg = "Failed to get backend via " + dorisConfig.getFrontends();
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        }
+    }
+
+    @Deprecated
+    @VisibleForTesting
+    static List<BackendRow> getBackends(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException, IOException {
+        String feNodes = dorisConfig.getFrontends();
+        String feNode = randomEndpoint(feNodes, logger);
+        String beUrl = String.format(BASE_URL, feNode, BACKENDS);
+        HttpGet httpGet = new HttpGet(beUrl);
+        String response = send(dorisConfig, httpGet, logger);
+        logger.info("Backend Info:{}", response);
+        List<BackendRow> backends = parseBackend(response, logger);
+        return backends;
+    }
+
+    @Deprecated
+    static List<BackendRow> parseBackend(String response, Logger logger)
+            throws DorisConnectorException, IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        Backend backend;
+        try {
+            backend = mapper.readValue(response, Backend.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        }
+
+        if (backend == null) {
+            logger.error(ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
+                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
+        }
+        List<BackendRow> backendRows =
+                backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList());
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    @VisibleForTesting
+    public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException, IOException {
+        String feNodes = dorisConfig.getFrontends();
+        List<String> feNodeList = allEndpoints(feNodes, logger);
+        for (String feNode : feNodeList) {
+            try {
+                String beUrl = "http://" + feNode + BACKENDS_V2;
+                HttpGet httpGet = new HttpGet(beUrl);
+                String response = send(dorisConfig, httpGet, logger);
+                logger.info("Backend Info:{}", response);
+                List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
+                return backends;
+            } catch (DorisConnectorException e) {
+                logger.info(
+                        "Doris FE node {} is unavailable: {}, Request the next Doris FE node",
+                        feNode,
+                        e.getMessage());
+            }
+        }
+        String errMsg = "No Doris FE is available, please check configuration";
+        throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+    }
+
+    static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger)
+            throws DorisConnectorException, IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        BackendV2 backend;
+        try {
+            backend = mapper.readValue(response, BackendV2.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris BE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris BE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris BE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        }
+
+        if (backend == null) {
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
+                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
+        }
+        List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
+        logger.debug("Parsing schema result is '{}'.", backendRows);
+        return backendRows;
+    }
+
+    @VisibleForTesting
+    static String getUriStr(DorisConfig dorisConfig, Logger logger) throws DorisConnectorException {
+        String[] identifier = parseIdentifier(dorisConfig.getTableIdentifier(), logger);
+        return "http://"
+                + randomEndpoint(dorisConfig.getFrontends(), logger)
+                + API_PREFIX
+                + "/"
+                + identifier[0]
+                + "/"
+                + identifier[1]
+                + "/";
+    }
+
+    public static Schema getSchema(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException {
+        logger.trace("Finding schema.");
+        HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA);
+        String response = send(dorisConfig, httpGet, logger);
+        logger.debug("Find schema response is '{}'.", response);
+        return parseSchema(response, logger);
+    }
+
+    public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException {
+        try {
+            return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig, logger).getKeysType());
+        } catch (Exception e) {
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
+        }
+    }
+
+    @VisibleForTesting
+    public static Schema parseSchema(String response, Logger logger)
+            throws DorisConnectorException {
+        logger.trace("Parse response '{}' to schema.", response);
+        ObjectMapper mapper = new ObjectMapper();
+        Schema schema;
+        try {
+            schema = mapper.readValue(response, Schema.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris FE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris FE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris FE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        }
+
+        if (schema == null) {
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
+                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
+        }
+
+        if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
+            String errMsg = "Doris FE's response is not OK, status is " + schema.getStatus();
+            logger.error(errMsg);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        logger.debug("Parsing schema result is '{}'.", schema);
+        return schema;
+    }
+
+    public static List<PartitionDefinition> findPartitions(DorisConfig dorisConfig, Logger logger)
+            throws DorisConnectorException {
+        String[] tableIdentifiers = parseIdentifier(dorisConfig.getTableIdentifier(), logger);
+        String readFields =
+                StringUtils.isBlank(dorisConfig.getReadField()) ? "*" : dorisConfig.getReadField();
+        String sql =
+                "select "
+                        + readFields
+                        + " from `"
+                        + tableIdentifiers[0]
+                        + "`.`"
+                        + tableIdentifiers[1]
+                        + "`";
+        if (!StringUtils.isEmpty(dorisConfig.getFilterQuery())) {
+            sql += " where " + dorisConfig.getFilterQuery();
+        }
+        logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
+
+        HttpPost httpPost = new HttpPost(getUriStr(dorisConfig, logger) + QUERY_PLAN);
+        String entity = "{\"sql\": \"" + sql + "\"}";
+        logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
+        StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
+        stringEntity.setContentEncoding("UTF-8");
+        stringEntity.setContentType("application/json");
+        httpPost.setEntity(stringEntity);
+
+        String resStr = send(dorisConfig, httpPost, logger);
+        logger.debug("Find partition response is '{}'.", resStr);
+        QueryPlan queryPlan = getQueryPlan(resStr, logger);
+        Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
+        return tabletsMapToPartition(
+                dorisConfig,
+                be2Tablets,
+                queryPlan.getOpaquedQueryPlan(),
+                tableIdentifiers[0],
+                tableIdentifiers[1],
+                logger);
+    }
+
+    @VisibleForTesting
+    static QueryPlan getQueryPlan(String response, Logger logger) throws DorisConnectorException {
+        ObjectMapper mapper = new ObjectMapper();
+        QueryPlan queryPlan;
+        try {
+            queryPlan = mapper.readValue(response, QueryPlan.class);
+        } catch (JsonParseException e) {
+            String errMsg = "Doris FE's response is not a json. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (JsonMappingException e) {
+            String errMsg = "Doris FE's response cannot map to schema. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        } catch (IOException e) {
+            String errMsg = "Parse Doris FE's response to json failed. res: " + response;
+            logger.error(errMsg, e);
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+        }
+
+        if (queryPlan == null) {
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.REST_SERVICE_FAILED,
+                    ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
+        }
+
+        if (queryPlan.getStatus() != REST_RESPONSE_STATUS_OK) {
+            String errMsg = "Doris FE's response is not OK, status is " + queryPlan.getStatus();
+            logger.error(errMsg);
+            throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+        }
+        logger.debug("Parsing partition result is '{}'.", queryPlan);
+        return queryPlan;
+    }
+
+    @VisibleForTesting
+    static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger)
+            throws DorisConnectorException {
+        Map<String, List<Long>> be2Tablets = new HashMap<>();
+        for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
+            logger.debug("Parse tablet info: '{}'.", part);
+            long tabletId;
+            try {
+                tabletId = Long.parseLong(part.getKey());
+            } catch (NumberFormatException e) {
+                String errMsg = "Parse tablet id '" + part.getKey() + "' to long failed.";
+                logger.error(errMsg, e);
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
+            }
+            String target = null;
+            int tabletCount = Integer.MAX_VALUE;
+            for (String candidate : part.getValue().getRoutings()) {
+                logger.trace("Evaluate Doris BE '{}' to tablet '{}'.", candidate, tabletId);
+                if (!be2Tablets.containsKey(candidate)) {
+                    logger.debug(
+                            "Choice a new Doris BE '{}' for tablet '{}'.", candidate, tabletId);
+                    List<Long> tablets = new ArrayList<>();
+                    be2Tablets.put(candidate, tablets);
+                    target = candidate;
+                    break;
+                } else {
+                    if (be2Tablets.get(candidate).size() < tabletCount) {
+                        target = candidate;
+                        tabletCount = be2Tablets.get(candidate).size();
+                        logger.debug(
+                                "Current candidate Doris BE to tablet '{}' is '{}' with tablet count {}.",
+                                tabletId,
+                                target,
+                                tabletCount);
+                    }
+                }
+            }
+            if (target == null) {
+                String errMsg = "Cannot choice Doris BE for tablet " + tabletId;
+                logger.error(errMsg);
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
+            }
+
+            logger.debug("Choice Doris BE '{}' for tablet '{}'.", target, tabletId);
+            be2Tablets.get(target).add(tabletId);
+        }
+        return be2Tablets;
+    }
+
+    @VisibleForTesting
+    static int tabletCountLimitForOnePartition(DorisConfig dorisConfig, Logger logger) {
+        int tabletsSize = DorisConfig.DORIS_TABLET_SIZE_DEFAULT;
+        if (dorisConfig.getTabletSize() != null) {
+            tabletsSize = dorisConfig.getTabletSize();
+        }
+        if (tabletsSize < DorisConfig.DORIS_TABLET_SIZE_MIN) {
+            logger.warn(
+                    "{} is less than {}, set to default value {}.",
+                    DorisConfig.DORIS_TABLET_SIZE,
+                    DorisConfig.DORIS_TABLET_SIZE_MIN,
+                    DorisConfig.DORIS_TABLET_SIZE_MIN);
+            tabletsSize = DorisConfig.DORIS_TABLET_SIZE_MIN;
+        }
+        logger.debug("Tablet size is set to {}.", tabletsSize);
+        return tabletsSize;
+    }
+
+    @VisibleForTesting
+    static List<PartitionDefinition> tabletsMapToPartition(
+            DorisConfig dorisConfig,
+            Map<String, List<Long>> be2Tablets,
+            String opaquedQueryPlan,
+            String database,
+            String table,
+            Logger logger)
+            throws DorisConnectorException {
+        int tabletsSize = tabletCountLimitForOnePartition(dorisConfig, logger);
+        List<PartitionDefinition> partitions = new ArrayList<>();
+        for (Map.Entry<String, List<Long>> beInfo : be2Tablets.entrySet()) {
+            logger.debug("Generate partition with beInfo: '{}'.", beInfo);
+            HashSet<Long> tabletSet = new HashSet<>(beInfo.getValue());
+            beInfo.getValue().clear();
+            beInfo.getValue().addAll(tabletSet);
+            int first = 0;
+            while (first < beInfo.getValue().size()) {
+                Set<Long> partitionTablets =
+                        new HashSet<>(
+                                beInfo.getValue()
+                                        .subList(
+                                                first,
+                                                Math.min(
+                                                        beInfo.getValue().size(),
+                                                        first + tabletsSize)));
+                first = first + tabletsSize;
+                PartitionDefinition partitionDefinition =
+                        new PartitionDefinition(
+                                database,
+                                table,
+                                beInfo.getKey(),
+                                partitionTablets,
+                                opaquedQueryPlan);
+                logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition);
+                partitions.add(partitionDefinition);
+            }
+        }
+        return partitions;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
similarity index 62%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
index 7492ec259..f151a0e72 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Backend.java
@@ -15,17 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.rest.models;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
 
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
-    private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+/** Be response model */
+@Deprecated
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Backend {
+
+    @JsonProperty(value = "rows")
+    private List<BackendRow> rows;
+
+    public List<BackendRow> getRows() {
+        return rows;
+    }
+
+    public void setRows(List<BackendRow> rows) {
+        this.rows = rows;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
similarity index 60%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
index 7492ec259..fe2260bea 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendRow.java
@@ -15,17 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.rest.models;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-import java.util.List;
+@Getter
+@Setter
+@ToString
+@Deprecated
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BackendRow {
 
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
-    private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+    @JsonProperty(value = "HttpPort")
+    private String httpPort;
+
+    @JsonProperty(value = "IP")
+    private String ip;
+
+    @JsonProperty(value = "Alive")
+    private Boolean alive;
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
new file mode 100644
index 000000000..47759e4bb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Be response model */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BackendV2 {
+
+    @JsonProperty(value = "backends")
+    private List<BackendRowV2> backends;
+
+    public List<BackendRowV2> getBackends() {
+        return backends;
+    }
+
+    public void setBackends(List<BackendRowV2> backends) {
+        this.backends = backends;
+    }
+
+    public static class BackendRowV2 {
+        @JsonProperty("ip")
+        public String ip;
+
+        @JsonProperty("http_port")
+        public int httpPort;
+
+        @JsonProperty("is_alive")
+        public boolean isAlive;
+
+        public String getIp() {
+            return ip;
+        }
+
+        public void setIp(String ip) {
+            this.ip = ip;
+        }
+
+        public int getHttpPort() {
+            return httpPort;
+        }
+
+        public void setHttpPort(int httpPort) {
+            this.httpPort = httpPort;
+        }
+
+        public boolean isAlive() {
+            return isAlive;
+        }
+
+        public void setAlive(boolean alive) {
+            isAlive = alive;
+        }
+
+        public String toBackendString() {
+            return ip + ":" + httpPort;
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
new file mode 100644
index 000000000..8c9d00d01
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Field.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import java.util.Objects;
+
+public class Field {
+    private String name;
+    private String type;
+    private String comment;
+    private int precision;
+    private int scale;
+    private String aggregationType;
+
+    public Field() {}
+
+    public Field(
+            String name,
+            String type,
+            String comment,
+            int precision,
+            int scale,
+            String aggregationType) {
+        this.name = name;
+        this.type = type;
+        this.comment = comment;
+        this.precision = precision;
+        this.scale = scale;
+        this.aggregationType = aggregationType;
+    }
+
+    public String getAggregationType() {
+        return aggregationType;
+    }
+
+    public void setAggregationType(String aggregationType) {
+        this.aggregationType = aggregationType;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getComment() {
+        return comment;
+    }
+
+    public void setComment(String comment) {
+        this.comment = comment;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public void setPrecision(int precision) {
+        this.precision = precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    public void setScale(int scale) {
+        this.scale = scale;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Field field = (Field) o;
+        return precision == field.precision
+                && scale == field.scale
+                && Objects.equals(name, field.name)
+                && Objects.equals(type, field.type)
+                && Objects.equals(comment, field.comment);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, type, comment, precision, scale);
+    }
+
+    @Override
+    public String toString() {
+        return "Field{"
+                + "name='"
+                + name
+                + '\''
+                + ", type='"
+                + type
+                + '\''
+                + ", comment='"
+                + comment
+                + '\''
+                + ", precision="
+                + precision
+                + ", scale="
+                + scale
+                + '}';
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
new file mode 100644
index 000000000..d59c6124c
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/QueryPlan.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class QueryPlan {
+    private int status;
+    private String opaquedQueryPlan;
+    private Map<String, Tablet> partitions;
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public String getOpaquedQueryPlan() {
+        return opaquedQueryPlan;
+    }
+
+    public void setOpaquedQueryPlan(String opaquedQueryPlan) {
+        this.opaquedQueryPlan = opaquedQueryPlan;
+    }
+
+    public Map<String, Tablet> getPartitions() {
+        return partitions;
+    }
+
+    public void setPartitions(Map<String, Tablet> partitions) {
+        this.partitions = partitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        QueryPlan queryPlan = (QueryPlan) o;
+        return status == queryPlan.status
+                && Objects.equals(opaquedQueryPlan, queryPlan.opaquedQueryPlan)
+                && Objects.equals(partitions, queryPlan.partitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(status, opaquedQueryPlan, partitions);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/RespContent.java
similarity index 52%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/RespContent.java
index 7492ec259..7b0230576 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/RespContent.java
@@ -15,17 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.rest.models;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-import java.util.List;
+@Getter
+@Setter
+@ToString
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class RespContent {
 
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
+    @JsonProperty(value = "TxnId")
+    private long txnId;
+
+    @JsonProperty(value = "Label")
     private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+
+    @JsonProperty(value = "Status")
+    private String status;
+
+    @JsonProperty(value = "TwoPhaseCommit")
+    private String twoPhaseCommit;
+
+    @JsonProperty(value = "ExistingJobStatus")
+    private String existingJobStatus;
+
+    @JsonProperty(value = "Message")
+    private String message;
+
+    @JsonProperty(value = "ErrorURL")
+    private String errorURL;
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
new file mode 100644
index 000000000..60e06bba9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Schema.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class Schema {
+    private int status = 0;
+    private String keysType;
+    private List<Field> properties;
+
+    public Schema() {
+        properties = new ArrayList<>();
+    }
+
+    public Schema(int fieldCount) {
+        properties = new ArrayList<>(fieldCount);
+    }
+
+    public int getStatus() {
+        return status;
+    }
+
+    public void setStatus(int status) {
+        this.status = status;
+    }
+
+    public String getKeysType() {
+        return keysType;
+    }
+
+    public void setKeysType(String keysType) {
+        this.keysType = keysType;
+    }
+
+    public List<Field> getProperties() {
+        return properties;
+    }
+
+    public void setProperties(List<Field> properties) {
+        this.properties = properties;
+    }
+
+    public void put(
+            String name,
+            String type,
+            String comment,
+            int scale,
+            int precision,
+            String aggregationType) {
+        properties.add(new Field(name, type, comment, scale, precision, aggregationType));
+    }
+
+    public void put(Field f) {
+        properties.add(f);
+    }
+
+    public Field get(int index) {
+        if (index >= properties.size()) {
+            throw new IndexOutOfBoundsException(
+                    "Index: " + index + ", Fields size:" + properties.size());
+        }
+        return properties.get(index);
+    }
+
+    public int size() {
+        return properties.size();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Schema schema = (Schema) o;
+        return status == schema.status && Objects.equals(properties, schema.properties);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(status, properties);
+    }
+
+    @Override
+    public String toString() {
+        return "Schema{" + "status=" + status + ", properties=" + properties + '}';
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
new file mode 100644
index 000000000..cb5249099
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/Tablet.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.rest.models;
+
+import java.util.List;
+import java.util.Objects;
+
+public class Tablet {
+    private List<String> routings;
+    private int version;
+    private long versionHash;
+    private long schemaHash;
+
+    public List<String> getRoutings() {
+        return routings;
+    }
+
+    public void setRoutings(List<String> routings) {
+        this.routings = routings;
+    }
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public long getVersionHash() {
+        return versionHash;
+    }
+
+    public void setVersionHash(long versionHash) {
+        this.versionHash = versionHash;
+    }
+
+    public long getSchemaHash() {
+        return schemaHash;
+    }
+
+    public void setSchemaHash(long schemaHash) {
+        this.schemaHash = schemaHash;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        Tablet tablet = (Tablet) o;
+        return version == tablet.version
+                && versionHash == tablet.versionHash
+                && schemaHash == tablet.schemaHash
+                && Objects.equals(routings, tablet.routings);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(routings, version, versionHash, schemaHash);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSerializer.java
similarity index 68%
copy from seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSerializer.java
index 9f3d17050..215511ac9 100644
--- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSerializer.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris;
+package org.apache.seatunnel.connectors.doris.serialize;
 
-import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.IOException;
+import java.io.Serializable;
 
-public class DorisFactoryTest {
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new DorisSinkFactory()).optionRule());
-    }
+public interface DorisSerializer extends Serializable {
+
+    void open() throws IOException;
+
+    byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException;
+
+    void close() throws IOException;
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSinkOP.java
similarity index 59%
copy from seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSinkOP.java
index 9f3d17050..223be8cae 100644
--- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/DorisSinkOP.java
@@ -15,16 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris;
+package org.apache.seatunnel.connectors.doris.serialize;
 
-import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
+import org.apache.seatunnel.api.table.type.RowKind;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class DorisFactoryTest {
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new DorisSinkFactory()).optionRule());
+public class DorisSinkOP {
+    public static String parseDeleteSign(RowKind rowKind) {
+        if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
+            return "0";
+        } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
+            return "1";
+        } else {
+            throw new IllegalArgumentException("Unrecognized row kind:" + rowKind.toString());
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
new file mode 100644
index 000000000..23b7e59fe
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowConverter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+
+import lombok.Builder;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+
+public class SeaTunnelRowConverter {
+    @Builder.Default private DateUtils.Formatter dateFormatter = DateUtils.Formatter.YYYY_MM_DD;
+
+    @Builder.Default
+    private DateTimeUtils.Formatter dateTimeFormatter = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+
+    @Builder.Default private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;
+
+    protected Object convert(SeaTunnelDataType dataType, Object val) {
+        if (val == null) {
+            return null;
+        }
+        switch (dataType.getSqlType()) {
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+            case DECIMAL:
+            case BOOLEAN:
+            case STRING:
+                return val;
+            case DATE:
+                return DateUtils.toString((LocalDate) val, dateFormatter);
+            case TIME:
+                return TimeUtils.toString((LocalTime) val, timeFormatter);
+            case TIMESTAMP:
+                return DateTimeUtils.toString((LocalDateTime) val, dateTimeFormatter);
+            case ARRAY:
+            case MAP:
+                return JsonUtils.toJsonString(val);
+            case BYTES:
+                return new String((byte[]) val);
+            default:
+                throw new DorisConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE, dataType + " is not supported ");
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
new file mode 100644
index 000000000..a0492d28e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.serialize;
+
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringJoiner;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.CSV;
+import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.JSON;
+import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.NULL_VALUE;
+
+public class SeaTunnelRowSerializer extends SeaTunnelRowConverter implements DorisSerializer {
+    String type;
+    private ObjectMapper objectMapper;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final String fieldDelimiter;
+    private final boolean enableDelete;
+
+    public SeaTunnelRowSerializer(
+            String type,
+            SeaTunnelRowType seaTunnelRowType,
+            String fieldDelimiter,
+            boolean enableDelete) {
+        this.type = type;
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.fieldDelimiter = fieldDelimiter;
+        this.enableDelete = enableDelete;
+        if (JSON.equals(type)) {
+            objectMapper = new ObjectMapper();
+        }
+    }
+
+    @Override
+    public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {
+        String valString;
+        if (JSON.equals(type)) {
+            valString = buildJsonString(seaTunnelRow);
+        } else if (CSV.equals(type)) {
+            valString = buildCSVString(seaTunnelRow);
+        } else {
+            throw new IllegalArgumentException("The type " + type + " is not supported!");
+        }
+        return valString.getBytes(StandardCharsets.UTF_8);
+    }
+
+    public String buildJsonString(SeaTunnelRow row) throws IOException {
+        Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
+
+        for (int i = 0; i < row.getFields().length; i++) {
+            Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
+            rowMap.put(seaTunnelRowType.getFieldName(i), value);
+        }
+        if (enableDelete) {
+            rowMap.put(
+                    LoadConstants.DORIS_DELETE_SIGN, DorisSinkOP.parseDeleteSign(row.getRowKind()));
+        }
+        return objectMapper.writeValueAsString(rowMap);
+    }
+
+    public String buildCSVString(SeaTunnelRow row) throws IOException {
+        StringJoiner joiner = new StringJoiner(fieldDelimiter);
+        for (int i = 0; i < row.getFields().length; i++) {
+            Object field = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
+            String value = field != null ? field.toString() : NULL_VALUE;
+            joiner.add(value);
+        }
+        if (enableDelete) {
+            joiner.add(parseDeleteSign(row.getRowKind()));
+        }
+        return joiner.toString();
+    }
+
+    public String parseDeleteSign(RowKind rowKind) {
+        if (RowKind.INSERT.equals(rowKind) || RowKind.UPDATE_AFTER.equals(rowKind)) {
+            return "0";
+        } else if (RowKind.DELETE.equals(rowKind) || RowKind.UPDATE_BEFORE.equals(rowKind)) {
+            return "1";
+        } else {
+            throw new IllegalArgumentException("Unrecognized row kind:" + rowKind.toString());
+        }
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /** Builder for RowDataSerializer. */
+    public static class Builder {
+        private SeaTunnelRowType seaTunnelRowType;
+        private String type;
+        private String fieldDelimiter;
+        private boolean deletable;
+
+        public Builder setType(String type) {
+            this.type = type;
+            return this;
+        }
+
+        public Builder setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
+            this.seaTunnelRowType = seaTunnelRowType;
+            return this;
+        }
+
+        public Builder setFieldDelimiter(String fieldDelimiter) {
+            this.fieldDelimiter = fieldDelimiter;
+            return this;
+        }
+
+        public Builder enableDelete(boolean deletable) {
+            this.deletable = deletable;
+            return this;
+        }
+
+        public SeaTunnelRowSerializer build() {
+            checkState(CSV.equals(type) && fieldDelimiter != null || JSON.equals(type));
+            return new SeaTunnelRowSerializer(type, seaTunnelRowType, fieldDelimiter, deletable);
+        }
+    }
+
+    @Override
+    public void open() throws IOException {}
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
index 02f43b9d1..018eb44bd 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSink.java
@@ -21,7 +21,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -29,20 +32,25 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.doris.config.DorisConfig;
 import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
+import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
+import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfoSerializer;
+import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitter;
+import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkState;
+import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkStateSerializer;
+import org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter;
 
 import com.google.auto.service.AutoService;
 
-import static org.apache.seatunnel.connectors.doris.config.SinkConfig.DATABASE;
-import static org.apache.seatunnel.connectors.doris.config.SinkConfig.NODE_URLS;
-import static org.apache.seatunnel.connectors.doris.config.SinkConfig.PASSWORD;
-import static org.apache.seatunnel.connectors.doris.config.SinkConfig.TABLE;
-import static org.apache.seatunnel.connectors.doris.config.SinkConfig.USERNAME;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
 
 @AutoService(SeaTunnelSink.class)
-public class DorisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
+public class DorisSink
+        implements SeaTunnelSink<SeaTunnelRow, DorisSinkState, DorisCommitInfo, DorisCommitInfo> {
 
     private Config pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
@@ -58,11 +66,9 @@ public class DorisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
         CheckResult result =
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
-                        NODE_URLS.key(),
-                        DATABASE.key(),
-                        TABLE.key(),
-                        USERNAME.key(),
-                        PASSWORD.key());
+                        DorisConfig.FENODES.key(),
+                        DorisConfig.USERNAME.key(),
+                        DorisConfig.TABLE_IDENTIFIER.key());
         if (!result.isSuccess()) {
             throw new DorisConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -83,7 +89,47 @@ public class DorisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) {
-        return new DorisSinkWriter(pluginConfig, seaTunnelRowType);
+    public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> createWriter(
+            SinkWriter.Context context) throws IOException {
+        DorisSinkWriter dorisSinkWriter =
+                new DorisSinkWriter(
+                        context, Collections.emptyList(), seaTunnelRowType, pluginConfig);
+        dorisSinkWriter.initializeLoad(Collections.emptyList());
+        return dorisSinkWriter;
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> restoreWriter(
+            SinkWriter.Context context, List<DorisSinkState> states) throws IOException {
+        DorisSinkWriter dorisWriter =
+                new DorisSinkWriter(context, states, seaTunnelRowType, pluginConfig);
+        dorisWriter.initializeLoad(states);
+        return dorisWriter;
+    }
+
+    @Override
+    public Optional<Serializer<DorisSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DorisSinkStateSerializer());
+    }
+
+    @Override
+    public Optional<SinkCommitter<DorisCommitInfo>> createCommitter() throws IOException {
+        return Optional.of(new DorisCommitter(pluginConfig));
+    }
+
+    @Override
+    public Optional<Serializer<DorisCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DorisCommitInfoSerializer());
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<DorisCommitInfo, DorisCommitInfo>>
+            createAggregatedCommitter() throws IOException {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Serializer<DorisCommitInfo>> getAggregatedCommitInfoSerializer() {
+        return Optional.empty();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
deleted file mode 100644
index dbbb3b701..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.sink;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class DorisSinkFactory implements TableSinkFactory {
-    @Override
-    public String factoryIdentifier() {
-        return "Doris";
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(
-                        SinkConfig.NODE_URLS,
-                        SinkConfig.USERNAME,
-                        SinkConfig.PASSWORD,
-                        SinkConfig.DATABASE,
-                        SinkConfig.TABLE)
-                .optional(
-                        SinkConfig.LABEL_PREFIX,
-                        SinkConfig.BATCH_MAX_SIZE,
-                        SinkConfig.BATCH_MAX_BYTES,
-                        SinkConfig.BATCH_INTERVAL_MS,
-                        SinkConfig.MAX_RETRIES,
-                        SinkConfig.MAX_RETRY_BACKOFF_MS,
-                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
-                        SinkConfig.DORIS_CONFIG)
-                .build();
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkWriter.java
deleted file mode 100644
index 2a90d8210..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/DorisSinkWriter.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.sink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.SerializationSchema;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.doris.client.DorisSinkManager;
-import org.apache.seatunnel.connectors.doris.config.SinkConfig;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.util.DelimiterParserUtil;
-import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import org.apache.seatunnel.format.json.JsonSerializationSchema;
-import org.apache.seatunnel.format.text.TextSerializationSchema;
-
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class DorisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
-
-    private ReadonlyConfig readonlyConfig;
-    private final SerializationSchema serializationSchema;
-    private final DorisSinkManager manager;
-
-    public DorisSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
-        List<String> fieldNames =
-                Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
-        this.serializationSchema = createSerializer(sinkConfig, seaTunnelRowType);
-        this.manager = new DorisSinkManager(sinkConfig, fieldNames);
-    }
-
-    @Override
-    public void write(SeaTunnelRow element) throws IOException {
-        String record = new String(serializationSchema.serialize(element));
-        manager.write(record);
-    }
-
-    @SneakyThrows
-    @Override
-    public Optional<Void> prepareCommit() {
-        // Flush to storage before snapshot state is performed
-        manager.flush();
-        return super.prepareCommit();
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            if (manager != null) {
-                manager.close();
-            }
-        } catch (IOException e) {
-            throw new DorisConnectorException(
-                    CommonErrorCode.WRITER_OPERATION_FAILED, "Close doris manager failed.", e);
-        }
-    }
-
-    public static SerializationSchema createSerializer(
-            SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
-            String columnSeparator =
-                    DelimiterParserUtil.parse(sinkConfig.getColumnSeparator(), "\t");
-            return TextSerializationSchema.builder()
-                    .seaTunnelRowType(seaTunnelRowType)
-                    .delimiter(columnSeparator)
-                    .build();
-        }
-        if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
-            return new JsonSerializationSchema(seaTunnelRowType);
-        }
-        throw new DorisConnectorException(
-                CommonErrorCode.ILLEGAL_ARGUMENT,
-                "Failed to create row serializer, unsupported `format` from stream load properties.");
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/HttpPutBuilder.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/HttpPutBuilder.java
new file mode 100644
index 000000000..100719f57
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/HttpPutBuilder.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink;
+
+import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/** Builder for HttpPut. */
+public class HttpPutBuilder {
+    String url;
+    Map<String, String> header;
+    HttpEntity httpEntity;
+
+    public HttpPutBuilder() {
+        header = new HashMap<>();
+    }
+
+    public HttpPutBuilder setUrl(String url) {
+        this.url = url;
+        return this;
+    }
+
+    public HttpPutBuilder addCommonHeader() {
+        header.put(HttpHeaders.EXPECT, "100-continue");
+        return this;
+    }
+
+    public HttpPutBuilder addHiddenColumns(boolean add) {
+        if (add) {
+            header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
+        }
+        return this;
+    }
+
+    public HttpPutBuilder enable2PC() {
+        header.put("two_phase_commit", "true");
+        return this;
+    }
+
+    public HttpPutBuilder baseAuth(String user, String password) {
+        final String authInfo = user + ":" + password;
+        byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+        header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
+        return this;
+    }
+
+    public HttpPutBuilder addTxnId(long txnID) {
+        header.put("txn_id", String.valueOf(txnID));
+        return this;
+    }
+
+    public HttpPutBuilder commit() {
+        header.put("txn_operation", "commit");
+        return this;
+    }
+
+    public HttpPutBuilder abort() {
+        header.put("txn_operation", "abort");
+        return this;
+    }
+
+    public HttpPutBuilder setEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        return this;
+    }
+
+    public HttpPutBuilder setEmptyEntity() {
+        try {
+            this.httpEntity = new StringEntity("");
+        } catch (Exception e) {
+            throw new IllegalArgumentException(e);
+        }
+        return this;
+    }
+
+    public HttpPutBuilder addProperties(Properties properties) {
+        properties.forEach((key, value) -> header.put(String.valueOf(key), String.valueOf(value)));
+        return this;
+    }
+
+    public HttpPutBuilder setLabel(String label) {
+        header.put("label", label);
+        return this;
+    }
+
+    public HttpPut build() {
+        checkNotNull(url);
+        checkNotNull(httpEntity);
+        HttpPut put = new HttpPut(url);
+        header.forEach(put::setHeader);
+        put.setEntity(httpEntity);
+        return put;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/LoadStatus.java
similarity index 69%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/LoadStatus.java
index 7492ec259..3beb9d992 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/LoadStatus.java
@@ -15,17 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.sink;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.List;
-
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
-    private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+/** enum of LoadStatus. */
+public class LoadStatus {
+    public static final String SUCCESS = "Success";
+    public static final String PUBLISH_TIMEOUT = "Publish Timeout";
+    public static final String LABEL_ALREADY_EXIST = "Label Already Exists";
+    public static final String FAIL = "Fail";
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfo.java
similarity index 60%
copy from seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfo.java
index 9f3d17050..8682959a5 100644
--- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfo.java
@@ -15,16 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris;
+package org.apache.seatunnel.connectors.doris.sink.committer;
 
-import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.Serializable;
 
-public class DorisFactoryTest {
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new DorisSinkFactory()).optionRule());
+@Setter
+@Getter
+@ToString
+@EqualsAndHashCode
+public class DorisCommitInfo implements Serializable {
+    private final String hostPort;
+    private final String db;
+    private final long txbID;
+
+    public DorisCommitInfo(String hostPort, String db, long txbID) {
+        this.hostPort = hostPort;
+        this.db = db;
+        this.txbID = txbID;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfoSerializer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfoSerializer.java
new file mode 100644
index 000000000..46808d119
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitInfoSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.committer;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** define how to serialize DorisCommittable. */
+public class DorisCommitInfoSerializer implements Serializer<DorisCommitInfo> {
+
+    @Override
+    public byte[] serialize(DorisCommitInfo dorisCommittable) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeUTF(dorisCommittable.getHostPort());
+            out.writeUTF(dorisCommittable.getDb());
+            out.writeLong(dorisCommittable.getTxbID());
+
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public DorisCommitInfo deserialize(byte[] serialized) throws IOException {
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                final DataInputStream in = new DataInputStream(bais)) {
+            final String hostPort = in.readUTF();
+            final String db = in.readUTF();
+            final long txnId = in.readLong();
+            return new DorisCommitInfo(hostPort, db, txnId);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
new file mode 100644
index 000000000..7c0e917d2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.committer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.sink.SinkCommitter;
+import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+import org.apache.seatunnel.connectors.doris.rest.RestService;
+import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
+import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
+import org.apache.seatunnel.connectors.doris.util.HttpUtil;
+import org.apache.seatunnel.connectors.doris.util.ResponseUtil;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** The committer to commit transaction. */
+@Slf4j
+public class DorisCommitter implements SinkCommitter<DorisCommitInfo> {
+    private static final String COMMIT_PATTERN = "http://%s/api/%s/_stream_load_2pc";
+    private static final int HTTP_TEMPORARY_REDIRECT = 200;
+    private final CloseableHttpClient httpClient;
+    private final DorisConfig dorisConfig;
+    int maxRetry;
+
+    public DorisCommitter(Config pluginConfig) {
+        this(
+                DorisConfig.loadConfig(pluginConfig),
+                DorisConfig.loadConfig(pluginConfig).getMaxRetries(),
+                new HttpUtil().getHttpClient());
+    }
+
+    public DorisCommitter(DorisConfig dorisConfig, int maxRetry, CloseableHttpClient client) {
+        this.dorisConfig = dorisConfig;
+        this.maxRetry = maxRetry;
+        this.httpClient = client;
+    }
+
+    @Override
+    public List<DorisCommitInfo> commit(List<DorisCommitInfo> commitInfos) throws IOException {
+        for (DorisCommitInfo commitInfo : commitInfos) {
+            commitTransaction(commitInfo);
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void abort(List<DorisCommitInfo> commitInfos) throws IOException {
+        for (DorisCommitInfo commitInfo : commitInfos) {
+            abortTransaction(commitInfo);
+        }
+    }
+
+    private void commitTransaction(DorisCommitInfo committable)
+            throws IOException, DorisConnectorException {
+        int statusCode = -1;
+        String reasonPhrase = null;
+        int retry = 0;
+        String hostPort = committable.getHostPort();
+        CloseableHttpResponse response = null;
+        while (retry++ <= maxRetry) {
+            HttpPutBuilder putBuilder = new HttpPutBuilder();
+            putBuilder
+                    .setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb()))
+                    .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword())
+                    .addCommonHeader()
+                    .addTxnId(committable.getTxbID())
+                    .setEmptyEntity()
+                    .commit();
+            try {
+                response = httpClient.execute(putBuilder.build());
+            } catch (IOException e) {
+                log.error("commit transaction failed: ", e);
+                hostPort = RestService.getBackend(dorisConfig, log);
+                continue;
+            }
+            statusCode = response.getStatusLine().getStatusCode();
+            reasonPhrase = response.getStatusLine().getReasonPhrase();
+            if (statusCode != HTTP_TEMPORARY_REDIRECT) {
+                log.warn("commit failed with {}, reason {}", hostPort, reasonPhrase);
+                hostPort = RestService.getBackend(dorisConfig, log);
+            } else {
+                break;
+            }
+        }
+
+        if (statusCode != HTTP_TEMPORARY_REDIRECT) {
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.STREAM_LOAD_FAILED, reasonPhrase);
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+        if (response != null && response.getEntity() != null) {
+            String loadResult = EntityUtils.toString(response.getEntity());
+            Map<String, String> res =
+                    mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
+            if (res.get("status").equals(LoadStatus.FAIL)
+                    && !ResponseUtil.isCommitted(res.get("msg"))) {
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.COMMIT_FAILED, loadResult);
+            } else {
+                log.info("load result {}", loadResult);
+            }
+        }
+    }
+
+    private void abortTransaction(DorisCommitInfo committable)
+            throws IOException, DorisConnectorException {
+        int statusCode;
+        int retry = 0;
+        String hostPort = committable.getHostPort();
+        CloseableHttpResponse response = null;
+        while (retry++ <= maxRetry) {
+            HttpPutBuilder builder = new HttpPutBuilder();
+            builder.setUrl(String.format(COMMIT_PATTERN, hostPort, committable.getDb()))
+                    .baseAuth(dorisConfig.getUsername(), dorisConfig.getPassword())
+                    .addCommonHeader()
+                    .addTxnId(committable.getTxbID())
+                    .setEmptyEntity()
+                    .abort();
+            response = httpClient.execute(builder.build());
+            statusCode = response.getStatusLine().getStatusCode();
+            if (statusCode != HTTP_TEMPORARY_REDIRECT || response.getEntity() == null) {
+                log.warn("abort transaction response: " + response.getStatusLine().toString());
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                        "Fail to abort transaction "
+                                + committable.getTxbID()
+                                + " with url "
+                                + String.format(COMMIT_PATTERN, hostPort, committable.getDb()));
+            }
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+        String loadResult = EntityUtils.toString(response.getEntity());
+        Map<String, String> res =
+                mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
+        if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
+            if (ResponseUtil.isCommitted(res.get("msg"))) {
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                        "try abort committed transaction, " + "do you recover from old savepoint?");
+            }
+            log.warn(
+                    "Fail to abort transaction. txnId: {}, error: {}",
+                    committable.getTxbID(),
+                    res.get("msg"));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
similarity index 63%
copy from seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
index 9f3d17050..03179f92e 100644
--- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkState.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris;
+package org.apache.seatunnel.connectors.doris.sink.writer;
 
-import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+@Setter
+@Getter
+@ToString
+@EqualsAndHashCode
+public class DorisSinkState {
+    private final String labelPrefix;
+    private final long checkpointId;
 
-public class DorisFactoryTest {
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new DorisSinkFactory()).optionRule());
+    public DorisSinkState(String labelPrefix, long checkpointId) {
+        this.labelPrefix = labelPrefix;
+        this.checkpointId = checkpointId;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkStateSerializer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkStateSerializer.java
new file mode 100644
index 000000000..213b1c785
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkStateSerializer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.writer;
+
+import org.apache.seatunnel.api.serialization.Serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/** Serializer for DorisWriterState. */
+public class DorisSinkStateSerializer implements Serializer<DorisSinkState> {
+    @Override
+    public byte[] serialize(DorisSinkState dorisSinkState) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeUTF(dorisSinkState.getLabelPrefix());
+            out.flush();
+            return baos.toByteArray();
+        }
+    }
+
+    @Override
+    public DorisSinkState deserialize(byte[] serialized) throws IOException {
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+                final DataInputStream in = new DataInputStream(bais)) {
+            final String labelPrefix = in.readUTF();
+            final long checkpointId = in.readLong();
+            return new DorisSinkState(labelPrefix, checkpointId);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
new file mode 100644
index 000000000..744db83e6
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.writer;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+import org.apache.seatunnel.connectors.doris.rest.RestService;
+import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
+import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
+import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
+import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
+import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
+import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
+import org.apache.seatunnel.connectors.doris.util.HttpUtil;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkState;
+
+@Slf4j
+public class DorisSinkWriter implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState> {
+    private static final int INITIAL_DELAY = 200;
+    private static final int CONNECT_TIMEOUT = 1000;
+    private static final List<String> DORIS_SUCCESS_STATUS =
+            new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS, LoadStatus.PUBLISH_TIMEOUT));
+    private final long lastCheckpointId;
+    private DorisStreamLoad dorisStreamLoad;
+    volatile boolean loading;
+    private final DorisConfig dorisConfig;
+    private final String labelPrefix;
+    private final LabelGenerator labelGenerator;
+    private final int intervalTime;
+    private final DorisSinkState dorisSinkState;
+    private final DorisSerializer serializer;
+    private final transient ScheduledExecutorService scheduledExecutorService;
+    private transient Thread executorThread;
+    private transient volatile Exception loadException = null;
+    private List<BackendV2.BackendRowV2> backends;
+    private long pos;
+
+    public DorisSinkWriter(
+            SinkWriter.Context context,
+            List<DorisSinkState> state,
+            SeaTunnelRowType seaTunnelRowType,
+            Config pluginConfig) {
+        this.dorisConfig = DorisConfig.loadConfig(pluginConfig);
+        this.lastCheckpointId = state.size() != 0 ? state.get(0).getCheckpointId() : 0;
+        log.info("restore checkpointId {}", lastCheckpointId);
+        log.info("labelPrefix " + dorisConfig.getLabelPrefix());
+        this.dorisSinkState = new DorisSinkState(dorisConfig.getLabelPrefix(), lastCheckpointId);
+        this.labelPrefix = dorisConfig.getLabelPrefix() + "_" + context.getIndexOfSubtask();
+        this.labelGenerator = new LabelGenerator(labelPrefix, dorisConfig.getEnable2PC());
+        this.scheduledExecutorService =
+                new ScheduledThreadPoolExecutor(
+                        1, new ThreadFactoryBuilder().setNameFormat("stream-load-check").build());
+        this.serializer = createSerializer(dorisConfig, seaTunnelRowType);
+        this.intervalTime = dorisConfig.getCheckInterval();
+        this.loading = false;
+    }
+
+    public void initializeLoad(List<DorisSinkState> state) throws IOException {
+        this.backends = RestService.getBackendsV2(dorisConfig, log);
+        String backend = getAvailableBackend();
+        try {
+            this.dorisStreamLoad =
+                    new DorisStreamLoad(
+                            backend, dorisConfig, labelGenerator, new HttpUtil().getHttpClient());
+            if (dorisConfig.getEnable2PC()) {
+                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+            }
+        } catch (Exception e) {
+            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+        }
+        // get main work thread.
+        executorThread = Thread.currentThread();
+        dorisStreamLoad.startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
+        // when uploading data in streaming mode, we need to regularly detect whether there are
+        // exceptions.
+        scheduledExecutorService.scheduleWithFixedDelay(
+                this::checkDone, INITIAL_DELAY, intervalTime, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        checkLoadException();
+        byte[] serialize = serializer.serialize(element);
+        if (Objects.isNull(serialize)) {
+            return;
+        }
+        dorisStreamLoad.writeRecord(serialize);
+    }
+
+    @Override
+    public Optional<DorisCommitInfo> prepareCommit() throws IOException {
+        // disable exception checker before stop load.
+        loading = false;
+        checkState(dorisStreamLoad != null);
+        RespContent respContent = dorisStreamLoad.stopLoad();
+        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+            String errMsg =
+                    String.format(
+                            "stream load error: %s, see more in %s",
+                            respContent.getMessage(), respContent.getErrorURL());
+            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
+        }
+        if (!dorisConfig.getEnable2PC()) {
+            return Optional.empty();
+        }
+        long txnId = respContent.getTxnId();
+
+        return Optional.of(
+                new DorisCommitInfo(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+    }
+
+    @Override
+    public List<DorisSinkState> snapshotState(long checkpointId) throws IOException {
+        checkState(dorisStreamLoad != null);
+        this.dorisStreamLoad.setHostPort(getAvailableBackend());
+        this.dorisStreamLoad.startLoad(labelGenerator.generateLabel(checkpointId + 1));
+        this.loading = true;
+        return Collections.singletonList(dorisSinkState);
+    }
+
+    @Override
+    public void abortPrepare() {
+        if (dorisConfig.getEnable2PC()) {
+            try {
+                dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void checkDone() {
+        // the load future is done and checked in prepareCommit().
+        // this will check error while loading.
+        log.debug("start timer checker, interval {} ms", intervalTime);
+        if (dorisStreamLoad.getPendingLoadFuture() != null
+                && dorisStreamLoad.getPendingLoadFuture().isDone()) {
+            if (!loading) {
+                log.debug("not loading, skip timer checker");
+                return;
+            }
+            String errorMsg;
+            try {
+                RespContent content =
+                        dorisStreamLoad.handlePreCommitResponse(
+                                dorisStreamLoad.getPendingLoadFuture().get());
+                errorMsg = content.getMessage();
+            } catch (Exception e) {
+                errorMsg = e.getMessage();
+            }
+
+            loadException =
+                    new DorisConnectorException(
+                            DorisConnectorErrorCode.STREAM_LOAD_FAILED, errorMsg);
+            log.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
+            // set the executor thread interrupted in case blocking in write data.
+            executorThread.interrupt();
+        }
+    }
+
+    private void checkLoadException() {
+        if (loadException != null) {
+            throw new RuntimeException("error while loading data.", loadException);
+        }
+    }
+
+    @VisibleForTesting
+    public boolean isLoading() {
+        return this.loading;
+    }
+
+    @VisibleForTesting
+    public void setDorisStreamLoad(DorisStreamLoad streamLoad) {
+        this.dorisStreamLoad = streamLoad;
+    }
+
+    @VisibleForTesting
+    public void setBackends(List<BackendV2.BackendRowV2> backends) {
+        this.backends = backends;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (scheduledExecutorService != null) {
+            scheduledExecutorService.shutdownNow();
+        }
+        if (dorisStreamLoad != null) {
+            dorisStreamLoad.close();
+        }
+    }
+
+    @VisibleForTesting
+    public String getAvailableBackend() {
+        long tmp = pos + backends.size();
+        while (pos < tmp) {
+            BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
+            String res = backend.toBackendString();
+            if (tryHttpConnection(res)) {
+                pos++;
+                return res;
+            }
+        }
+        String errMsg = "no available backend.";
+        throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
+    }
+
+    public boolean tryHttpConnection(String backend) {
+        try {
+            backend = "http://" + backend;
+            URL url = new URL(backend);
+            HttpURLConnection co = (HttpURLConnection) url.openConnection();
+            co.setConnectTimeout(CONNECT_TIMEOUT);
+            co.connect();
+            co.disconnect();
+            return true;
+        } catch (Exception ex) {
+            log.warn("Failed to connect to backend:{}", backend, ex);
+            pos++;
+            return false;
+        }
+    }
+
+    private DorisSerializer createSerializer(
+            DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
+        return new SeaTunnelRowSerializer(
+                dorisConfig
+                        .getStreamLoadProps()
+                        .getProperty(LoadConstants.FORMAT_KEY)
+                        .toLowerCase(),
+                seaTunnelRowType,
+                dorisConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
+                dorisConfig.getEnableDelete());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
new file mode 100644
index 000000000..696d5c409
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.writer;
+
+import org.apache.seatunnel.connectors.doris.config.DorisConfig;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
+import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
+import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
+import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
+import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
+import org.apache.seatunnel.connectors.doris.util.ResponseUtil;
+
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.InputStreamEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
+import static org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
+import static org.apache.seatunnel.connectors.doris.util.ResponseUtil.LABEL_EXIST_PATTERN;
+
+/** load data to doris. */
+@Slf4j
+public class DorisStreamLoad implements Serializable {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int HTTP_TEMPORARY_REDIRECT = 200;
+    private final LabelGenerator labelGenerator;
+    private final byte[] lineDelimiter;
+    private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
+    private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
+    private static final String JOB_EXIST_FINISHED = "FINISHED";
+
+    private String loadUrlStr;
+    private String hostPort;
+    private final String abortUrlStr;
+    private final String user;
+    private final String passwd;
+    private final String db;
+    private final String table;
+    private final boolean enable2PC;
+    private final boolean enableDelete;
+    private final Properties streamLoadProp;
+    private final RecordStream recordStream;
+    private Future<CloseableHttpResponse> pendingLoadFuture;
+    private final CloseableHttpClient httpClient;
+    private final ExecutorService executorService;
+    private boolean loadBatchFirstRecord;
+
+    public DorisStreamLoad(
+            String hostPort,
+            DorisConfig dorisConfig,
+            LabelGenerator labelGenerator,
+            CloseableHttpClient httpClient) {
+        this.hostPort = hostPort;
+        String[] tableInfo = dorisConfig.getTableIdentifier().split("\\.");
+        this.db = tableInfo[0];
+        this.table = tableInfo[1];
+        this.user = dorisConfig.getUsername();
+        this.passwd = dorisConfig.getPassword();
+        this.labelGenerator = labelGenerator;
+        this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, db, table);
+        this.abortUrlStr = String.format(ABORT_URL_PATTERN, hostPort, db);
+        this.enable2PC = dorisConfig.getEnable2PC();
+        this.streamLoadProp = dorisConfig.getStreamLoadProps();
+        this.enableDelete = dorisConfig.getEnableDelete();
+        this.httpClient = httpClient;
+        this.executorService =
+                new ThreadPoolExecutor(
+                        1,
+                        1,
+                        0L,
+                        TimeUnit.MILLISECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new ThreadFactoryBuilder().setNameFormat("stream-load-upload").build());
+        this.recordStream =
+                new RecordStream(dorisConfig.getBufferSize(), dorisConfig.getBufferCount());
+        lineDelimiter =
+                streamLoadProp.getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT).getBytes();
+        loadBatchFirstRecord = true;
+    }
+
+    public String getDb() {
+        return db;
+    }
+
+    public String getHostPort() {
+        return hostPort;
+    }
+
+    public void setHostPort(String hostPort) {
+        this.hostPort = hostPort;
+        this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db, this.table);
+    }
+
+    public Future<CloseableHttpResponse> getPendingLoadFuture() {
+        return pendingLoadFuture;
+    }
+
+    public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
+        long startChkID = chkID;
+        log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
+        while (true) {
+            try {
+                String label = labelGenerator.generateLabel(startChkID);
+                HttpPutBuilder builder = new HttpPutBuilder();
+                builder.setUrl(loadUrlStr)
+                        .baseAuth(user, passwd)
+                        .addCommonHeader()
+                        .enable2PC()
+                        .setLabel(label)
+                        .setEmptyEntity()
+                        .addProperties(streamLoadProp);
+                RespContent respContent =
+                        handlePreCommitResponse(httpClient.execute(builder.build()));
+                checkState("true".equals(respContent.getTwoPhaseCommit()));
+                if (LoadStatus.LABEL_ALREADY_EXIST.equals(respContent.getStatus())) {
+                    // label already exist and job finished
+                    if (JOB_EXIST_FINISHED.equals(respContent.getExistingJobStatus())) {
+                        throw new DorisConnectorException(
+                                DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                                "Load status is "
+                                        + LoadStatus.LABEL_ALREADY_EXIST
+                                        + " and load job finished, "
+                                        + "change you label prefix or restore from latest savepoint!");
+                    }
+                    // job not finished, abort.
+                    Matcher matcher = LABEL_EXIST_PATTERN.matcher(respContent.getMessage());
+                    if (matcher.find()) {
+                        checkState(label.equals(matcher.group(1)));
+                        long txnId = Long.parseLong(matcher.group(2));
+                        log.info("abort {} for exist label {}", txnId, label);
+                        abortTransaction(txnId);
+                    } else {
+                        throw new DorisConnectorException(
+                                DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                                "Load Status is "
+                                        + LoadStatus.LABEL_ALREADY_EXIST
+                                        + ", but no txnID associated with it!"
+                                        + "response: "
+                                        + respContent);
+                    }
+                } else {
+                    log.info("abort {} for check label {}.", respContent.getTxnId(), label);
+                    abortTransaction(respContent.getTxnId());
+                    break;
+                }
+                startChkID++;
+            } catch (Exception e) {
+                log.warn("failed to stream load data", e);
+                throw e;
+            }
+        }
+        log.info("abort for labelSuffix {} finished", labelSuffix);
+    }
+
+    public void writeRecord(byte[] record) throws IOException {
+        if (loadBatchFirstRecord) {
+            loadBatchFirstRecord = false;
+        } else {
+            recordStream.write(lineDelimiter);
+        }
+        recordStream.write(record);
+    }
+
+    @VisibleForTesting
+    public RecordStream getRecordStream() {
+        return recordStream;
+    }
+
+    public RespContent handlePreCommitResponse(CloseableHttpResponse response) throws Exception {
+        final int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode == HTTP_TEMPORARY_REDIRECT && response.getEntity() != null) {
+            String loadResult = EntityUtils.toString(response.getEntity());
+            log.info("load Result {}", loadResult);
+            return OBJECT_MAPPER.readValue(loadResult, RespContent.class);
+        }
+        throw new DorisConnectorException(
+                DorisConnectorErrorCode.STREAM_LOAD_FAILED, response.getStatusLine().toString());
+    }
+
+    public RespContent stopLoad() throws IOException {
+        recordStream.endInput();
+        log.info("stream load stopped.");
+        checkState(pendingLoadFuture != null);
+        try {
+            return handlePreCommitResponse(pendingLoadFuture.get());
+        } catch (Exception e) {
+            throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
+        }
+    }
+
+    public void startLoad(String label) throws IOException {
+        loadBatchFirstRecord = true;
+        HttpPutBuilder putBuilder = new HttpPutBuilder();
+        recordStream.startInput();
+        log.info("stream load started for {}", label);
+        try {
+            InputStreamEntity entity = new InputStreamEntity(recordStream);
+            putBuilder
+                    .setUrl(loadUrlStr)
+                    .baseAuth(user, passwd)
+                    .addCommonHeader()
+                    .addHiddenColumns(enableDelete)
+                    .setLabel(label)
+                    .setEntity(entity)
+                    .addProperties(streamLoadProp);
+            if (enable2PC) {
+                putBuilder.enable2PC();
+            }
+            pendingLoadFuture =
+                    executorService.submit(
+                            () -> {
+                                log.info("start execute load");
+                                return httpClient.execute(putBuilder.build());
+                            });
+        } catch (Exception e) {
+            String err = "failed to stream load data with label: " + label;
+            log.warn(err, e);
+            throw e;
+        }
+    }
+
+    public void abortTransaction(long txnID) throws Exception {
+        HttpPutBuilder builder = new HttpPutBuilder();
+        builder.setUrl(abortUrlStr)
+                .baseAuth(user, passwd)
+                .addCommonHeader()
+                .addTxnId(txnID)
+                .setEmptyEntity()
+                .abort();
+        CloseableHttpResponse response = httpClient.execute(builder.build());
+
+        int statusCode = response.getStatusLine().getStatusCode();
+        if (statusCode != HTTP_TEMPORARY_REDIRECT || response.getEntity() == null) {
+            log.warn("abort transaction response: " + response.getStatusLine().toString());
+            throw new DorisConnectorException(
+                    DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                    "Fail to abort transaction " + txnID + " with url " + abortUrlStr);
+        }
+
+        ObjectMapper mapper = new ObjectMapper();
+        String loadResult = EntityUtils.toString(response.getEntity());
+        Map<String, String> res =
+                mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
+        if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
+            if (ResponseUtil.isCommitted(res.get("msg"))) {
+                throw new DorisConnectorException(
+                        DorisConnectorErrorCode.STREAM_LOAD_FAILED,
+                        "try abort committed transaction, " + "do you recover from old savepoint?");
+            }
+            log.warn("Fail to abort transaction. txnId: {}, error: {}", txnID, res.get("msg"));
+        }
+    }
+
+    public void close() throws IOException {
+        if (null != httpClient) {
+            try {
+                httpClient.close();
+            } catch (IOException e) {
+                throw new IOException("Closing httpClient failed.", e);
+            }
+        }
+        if (null != executorService) {
+            executorService.shutdownNow();
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
similarity index 60%
rename from seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
rename to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
index 9f3d17050..1f4fdbb45 100644
--- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/DorisFactoryTest.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LabelGenerator.java
@@ -15,16 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris;
+package org.apache.seatunnel.connectors.doris.sink.writer;
 
-import org.apache.seatunnel.connectors.doris.sink.DorisSinkFactory;
+/** Generator label for stream load. */
+public class LabelGenerator {
+    private String labelPrefix;
+    private boolean enable2PC;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+    public LabelGenerator(String labelPrefix, boolean enable2PC) {
+        this.labelPrefix = labelPrefix;
+        this.enable2PC = enable2PC;
+    }
 
-public class DorisFactoryTest {
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new DorisSinkFactory()).optionRule());
+    public String generateLabel(long chkId) {
+        return enable2PC
+                ? labelPrefix + "_" + chkId
+                : labelPrefix + "_" + System.currentTimeMillis();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LoadConstants.java
similarity index 53%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LoadConstants.java
index 7492ec259..7ee9eff9e 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/LoadConstants.java
@@ -15,17 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.sink.writer;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.List;
-
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
-    private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+/** Constants for load. */
+public class LoadConstants {
+    public static final String COLUMNS_KEY = "columns";
+    public static final String FIELD_DELIMITER_KEY = "column_separator";
+    public static final String FIELD_DELIMITER_DEFAULT = "\t";
+    public static final String LINE_DELIMITER_KEY = "line_delimiter";
+    public static final String LINE_DELIMITER_DEFAULT = "\n";
+    public static final String FORMAT_KEY = "format";
+    public static final String JSON = "json";
+    public static final String CSV = "csv";
+    public static final String NULL_VALUE = "\\N";
+    public static final String DORIS_DELETE_SIGN = "__DORIS_DELETE_SIGN__";
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
new file mode 100644
index 000000000..7c861658f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordBuffer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.writer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/** Channel of record stream and HTTP data stream. */
+@Slf4j
+public class RecordBuffer {
+    BlockingQueue<ByteBuffer> writeQueue;
+    BlockingQueue<ByteBuffer> readQueue;
+    int bufferCapacity;
+    int queueSize;
+    ByteBuffer currentWriteBuffer;
+    ByteBuffer currentReadBuffer;
+
+    public RecordBuffer(int capacity, int queueSize) {
+        log.info("init RecordBuffer capacity {}, count {}", capacity, queueSize);
+        checkState(capacity > 0);
+        checkState(queueSize > 1);
+        this.writeQueue = new ArrayBlockingQueue<>(queueSize);
+        for (int index = 0; index < queueSize; index++) {
+            this.writeQueue.add(ByteBuffer.allocate(capacity));
+        }
+        readQueue = new LinkedBlockingDeque<>();
+        this.bufferCapacity = capacity;
+        this.queueSize = queueSize;
+    }
+
+    public void startBufferData() {
+        log.info(
+                "start buffer data, read queue size {}, write queue size {}",
+                readQueue.size(),
+                writeQueue.size());
+        checkState(readQueue.size() == 0);
+        checkState(writeQueue.size() == queueSize);
+        for (ByteBuffer byteBuffer : writeQueue) {
+            checkState(byteBuffer.position() == 0);
+            checkState(byteBuffer.remaining() == bufferCapacity);
+        }
+    }
+
+    public void stopBufferData() throws IOException {
+        try {
+            // add Empty buffer as finish flag.
+            boolean isEmpty = false;
+            if (currentWriteBuffer != null) {
+                currentWriteBuffer.flip();
+                // check if the current write buffer is empty.
+                isEmpty = currentWriteBuffer.limit() == 0;
+                readQueue.put(currentWriteBuffer);
+                currentWriteBuffer = null;
+            }
+            if (!isEmpty) {
+                ByteBuffer byteBuffer = writeQueue.take();
+                byteBuffer.flip();
+                checkState(byteBuffer.limit() == 0);
+                readQueue.put(byteBuffer);
+            }
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    public void write(byte[] buf) throws InterruptedException {
+        int wPos = 0;
+        do {
+            if (currentWriteBuffer == null) {
+                currentWriteBuffer = writeQueue.take();
+            }
+            int available = currentWriteBuffer.remaining();
+            int nWrite = Math.min(available, buf.length - wPos);
+            currentWriteBuffer.put(buf, wPos, nWrite);
+            wPos += nWrite;
+            if (currentWriteBuffer.remaining() == 0) {
+                currentWriteBuffer.flip();
+                readQueue.put(currentWriteBuffer);
+                currentWriteBuffer = null;
+            }
+        } while (wPos != buf.length);
+    }
+
+    public int read(byte[] buf) throws InterruptedException {
+        if (currentReadBuffer == null) {
+            currentReadBuffer = readQueue.take();
+        }
+        // add empty buffer as end flag
+        if (currentReadBuffer.limit() == 0) {
+            recycleBuffer(currentReadBuffer);
+            currentReadBuffer = null;
+            checkState(readQueue.size() == 0);
+            return -1;
+        }
+        int available = currentReadBuffer.remaining();
+        int nRead = Math.min(available, buf.length);
+        currentReadBuffer.get(buf, 0, nRead);
+        if (currentReadBuffer.remaining() == 0) {
+            recycleBuffer(currentReadBuffer);
+            currentReadBuffer = null;
+        }
+        return nRead;
+    }
+
+    private void recycleBuffer(ByteBuffer buffer) throws InterruptedException {
+        buffer.clear();
+        writeQueue.put(buffer);
+    }
+
+    public int getWriteQueueSize() {
+        return writeQueue.size();
+    }
+
+    public int getReadQueueSize() {
+        return readQueue.size();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
new file mode 100644
index 000000000..73d33e3dd
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/RecordStream.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.sink.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/** Record Stream for writing record. */
+public class RecordStream extends InputStream {
+    private final RecordBuffer recordBuffer;
+
+    @Override
+    public int read() throws IOException {
+        return 0;
+    }
+
+    public RecordStream(int bufferSize, int bufferCount) {
+        this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
+    }
+
+    public void startInput() {
+        recordBuffer.startBufferData();
+    }
+
+    public void endInput() throws IOException {
+        recordBuffer.stopBufferData();
+    }
+
+    @Override
+    public int read(byte[] buff) throws IOException {
+        try {
+            return recordBuffer.read(buff);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void write(byte[] buff) throws IOException {
+        try {
+            recordBuffer.write(buff);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DelimiterParserUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DelimiterParserUtil.java
deleted file mode 100644
index fda7b7efd..000000000
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DelimiterParserUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.util;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-
-import com.google.common.base.Strings;
-
-import java.io.StringWriter;
-
-public class DelimiterParserUtil {
-    private static final int SHIFT = 4;
-
-    private static final String HEX_STRING = "0123456789ABCDEF";
-
-    public static String parse(String sp, String dSp) throws RuntimeException {
-        if (Strings.isNullOrEmpty(sp)) {
-            return dSp;
-        }
-        if (!sp.toUpperCase().startsWith("\\X")) {
-            return sp;
-        }
-        String hexStr = sp.substring(2);
-        // check hex str
-        if (hexStr.isEmpty()) {
-            throw new DorisConnectorException(
-                    CommonErrorCode.ILLEGAL_ARGUMENT,
-                    "Failed to parse delimiter: `Hex str is empty`");
-        }
-        if (hexStr.length() % 2 != 0) {
-            throw new DorisConnectorException(
-                    CommonErrorCode.ILLEGAL_ARGUMENT,
-                    "Failed to parse delimiter: `Hex str is empty`");
-        }
-        for (char hexChar : hexStr.toUpperCase().toCharArray()) {
-            if (HEX_STRING.indexOf(hexChar) == -1) {
-                throw new DorisConnectorException(
-                        CommonErrorCode.ILLEGAL_ARGUMENT,
-                        "Failed to parse delimiter: `Hex str is empty`");
-            }
-        }
-        // transform to separator
-        StringWriter writer = new StringWriter();
-        for (byte b : hexStrToBytes(hexStr)) {
-            writer.append((char) b);
-        }
-        return writer.toString();
-    }
-
-    private static byte[] hexStrToBytes(String hexStr) {
-        String upperHexStr = hexStr.toUpperCase();
-        int length = upperHexStr.length() / 2;
-        char[] hexChars = upperHexStr.toCharArray();
-        byte[] bytes = new byte[length];
-        for (int i = 0; i < length; i++) {
-            int pos = i * 2;
-            bytes[i] = (byte) (charToByte(hexChars[pos]) << SHIFT | charToByte(hexChars[pos + 1]));
-        }
-        return bytes;
-    }
-
-    private static byte charToByte(char c) {
-        return (byte) HEX_STRING.indexOf(c);
-    }
-}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
similarity index 50%
rename from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
rename to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
index 7492ec259..8de43d303 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisFlushTuple.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.client;
+package org.apache.seatunnel.connectors.doris.util;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.List;
-
-@AllArgsConstructor
-@Data
-public class DorisFlushTuple {
-    private String label;
-    private Long bytes;
-    private List<byte[]> rows;
+public abstract class ErrorMessages {
+    public static final String PARSE_NUMBER_FAILED_MESSAGE =
+            "Parse '{}' to number failed. Original string is '{}'.";
+    public static final String PARSE_BOOL_FAILED_MESSAGE =
+            "Parse '{}' to boolean failed. Original string is '{}'.";
+    public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed.";
+    public static final String ILLEGAL_ARGUMENT_MESSAGE =
+            "argument '{}' is illegal, value is '{}'.";
+    public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here.";
+    public static final String DORIS_INTERNAL_FAIL_MESSAGE =
+            "Doris server '{}' internal failed, status is '{}', error message is '{}'";
 }
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/HttpUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/HttpUtil.java
new file mode 100644
index 000000000..46d1126c2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/HttpUtil.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.util;
+
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+
+/** util to build http client. */
+public class HttpUtil {
+    private final HttpClientBuilder httpClientBuilder =
+            HttpClients.custom()
+                    .setRedirectStrategy(
+                            new DefaultRedirectStrategy() {
+                                @Override
+                                protected boolean isRedirectable(String method) {
+                                    return true;
+                                }
+                            });
+
+    public CloseableHttpClient getHttpClient() {
+        return httpClientBuilder.build();
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
new file mode 100644
index 000000000..3e914d7d2
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/IOUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.util;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.Properties;
+
+public class IOUtils {
+    public static String propsToString(Properties props) throws IllegalArgumentException {
+        StringWriter sw = new StringWriter();
+        if (props != null) {
+            try {
+                props.store(sw, "");
+            } catch (IOException ex) {
+                throw new IllegalArgumentException("Cannot parse props to String.", ex);
+            }
+        }
+        return sw.toString();
+    }
+
+    public static Properties propsFromString(String source) throws IllegalArgumentException {
+        Properties copy = new Properties();
+        if (source != null) {
+            try {
+                copy.load(new StringReader(source));
+            } catch (IOException ex) {
+                throw new IllegalArgumentException("Cannot parse props from String.", ex);
+            }
+        }
+        return copy;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ResponseUtil.java
similarity index 51%
copy from seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java
copy to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ResponseUtil.java
index b049a01c2..0f5969ddf 100644
--- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/exception/DorisConnectorErrorCode.java
+++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ResponseUtil.java
@@ -15,28 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.doris.exception;
+package org.apache.seatunnel.connectors.doris.util;
 
-import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import java.util.regex.Pattern;
 
-public enum DorisConnectorErrorCode implements SeaTunnelErrorCode {
-    WRITE_RECORDS_FAILED("DORIS-01", "Writing records to Doris failed.");
+/** util for handle response. */
+public class ResponseUtil {
+    public static final Pattern LABEL_EXIST_PATTERN =
+            Pattern.compile(
+                    "errCode = 2, detailMessage = Label \\[(.*)\\] "
+                            + "has already been used, relate to txn \\[(\\d+)\\]");
+    public static final Pattern COMMITTED_PATTERN =
+            Pattern.compile(
+                    "errCode = 2, detailMessage = transaction \\[(\\d+)\\] "
+                            + "is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed.");
 
-    private final String code;
-    private final String description;
-
-    DorisConnectorErrorCode(String code, String description) {
-        this.code = code;
-        this.description = description;
-    }
-
-    @Override
-    public String getCode() {
-        return code;
-    }
-
-    @Override
-    public String getDescription() {
-        return description;
+    public static boolean isCommitted(String msg) {
+        return COMMITTED_PATTERN.matcher(msg).matches();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-doris/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
similarity index 56%
copy from seatunnel-connectors-v2/connector-doris/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
index f06550674..347c93d28 100644
--- a/seatunnel-connectors-v2/connector-doris/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/pom.xml
@@ -1,69 +1,43 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-connectors-v2</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <version>${revision}</version>
     </parent>
 
-    <artifactId>connector-doris</artifactId>
-    <name>SeaTunnel : Connectors V2 : Doris</name>
-
-    <properties>
-        <httpclient.version>4.5.13</httpclient.version>
-        <httpcore.version>4.4.4</httpcore.version>
-    </properties>
+    <artifactId>connector-doris-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Doris</name>
 
     <dependencies>
+        <!-- SeaTunnel connectors -->
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-common</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>${httpclient.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpcore</artifactId>
-            <version>${httpcore.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-format-json</artifactId>
+            <artifactId>connector-doris</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-format-text</artifactId>
+            <artifactId>connector-fake</artifactId>
             <version>${project.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
new file mode 100644
index 000000000..8983269a1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.doris;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@Disabled
+public class DorisCDCSinkIT extends TestSuiteBase implements TestResource {
+    private static final String DOCKER_IMAGE = "zykkk/doris:1.2.2.1-avx2-x86_84";
+    private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+    private static final String HOST = "doris_cdc_e2e";
+    private static final int DOCKER_PORT = 9030;
+    private static final int PORT = 8961;
+    private static final String URL = "jdbc:mysql://%s:" + PORT;
+    private static final String USERNAME = "root";
+    private static final String PASSWORD = "";
+    private static final String DATABASE = "test";
+    private static final String SINK_TABLE = "e2e_table_sink";
+    private static final String DRIVER_JAR =
+            "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+    private static final String SET_SQL =
+            "ADMIN SET FRONTEND CONFIG (\"enable_batch_delete_by_default\" = \"true\")";
+    private static final String CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS " + DATABASE;
+    private static final String DDL_SINK =
+            "CREATE TABLE IF NOT EXISTS "
+                    + DATABASE
+                    + "."
+                    + SINK_TABLE
+                    + " (\n"
+                    + "  uuid   BIGINT,\n"
+                    + "  name    VARCHAR(128),\n"
+                    + "  score   INT\n"
+                    + ")ENGINE=OLAP\n"
+                    + "UNIQUE KEY(`uuid`)\n"
+                    + "DISTRIBUTED BY HASH(`uuid`) BUCKETS 1\n"
+                    + "PROPERTIES (\n"
+                    + "\"replication_allocation\" = \"tag.location.default: 1\""
+                    + ")";
+
+    private Connection jdbcConnection;
+    private GenericContainer<?> dorisServer;
+
+    @BeforeAll
+    @Override
+    public void startUp() {
+        dorisServer =
+                new GenericContainer<>(DOCKER_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(HOST)
+                        .withPrivilegedMode(true)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
+        dorisServer.setPortBindings(Lists.newArrayList(String.format("%s:%s", PORT, DOCKER_PORT)));
+        Startables.deepStart(Stream.of(dorisServer)).join();
+        log.info("doris container started");
+        // wait for doris fully start
+        given().ignoreExceptions()
+                .await()
+                .atMost(10000, TimeUnit.SECONDS)
+                .untilAsserted(this::initializeJdbcConnection);
+        initializeJdbcTable();
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (jdbcConnection != null) {
+            jdbcConnection.close();
+        }
+        if (dorisServer != null) {
+            dorisServer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testDorisSink(TestContainer container) throws Exception {
+        Container.ExecResult execResult =
+                container.executeJob("/write-cdc-changelog-to-doris.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+
+        String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE);
+        Set<List<Object>> actual = new HashSet<>();
+        try (Statement sinkStatement = jdbcConnection.createStatement()) {
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            while (sinkResultSet.next()) {
+                List<Object> row =
+                        Arrays.asList(
+                                sinkResultSet.getLong("uuid"),
+                                sinkResultSet.getString("name"),
+                                sinkResultSet.getInt("score"));
+                actual.add(row);
+            }
+        }
+        Set<List<Object>> expected =
+                Stream.<List<Object>>of(Arrays.asList(1L, "A_1", 100), Arrays.asList(3L, "C", 100))
+                        .collect(Collectors.toSet());
+        Assertions.assertIterableEquals(expected, actual);
+    }
+
+    private void initializeJdbcConnection()
+            throws SQLException, ClassNotFoundException, InstantiationException,
+                    IllegalAccessException, MalformedURLException {
+        URLClassLoader urlClassLoader =
+                new URLClassLoader(
+                        new URL[] {new URL(DRIVER_JAR)}, DorisCDCSinkIT.class.getClassLoader());
+        Thread.currentThread().setContextClassLoader(urlClassLoader);
+        Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance();
+        Properties props = new Properties();
+        props.put("user", USERNAME);
+        props.put("password", PASSWORD);
+        jdbcConnection = driver.connect(String.format(URL, dorisServer.getHost()), props);
+        try (Statement statement = jdbcConnection.createStatement()) {
+            statement.execute(SET_SQL);
+        }
+    }
+
+    private void initializeJdbcTable() {
+        try (Statement statement = jdbcConnection.createStatement()) {
+            // create databases
+            statement.execute(CREATE_DATABASE);
+            // create sink table
+            statement.execute(DDL_SINK);
+        } catch (SQLException e) {
+            throw new RuntimeException("Initializing table failed!", e);
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
new file mode 100644
index 000000000..c6e423ae1
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/write-cdc-changelog-to-doris.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+    execution.parallelism = 1
+    job.mode = "BATCH"
+}
+
+source {
+    FakeSource {
+        schema = {
+            fields {
+                pk_id = bigint
+                name = string
+                score = int
+            }
+        }
+        rows = [
+            {
+                kind = INSERT
+                fields = [1, "A", 100]
+            },
+            {
+                kind = INSERT
+                fields = [2, "B", 100]
+            },
+            {
+                kind = INSERT
+                fields = [3, "C", 100]
+            },
+            {
+                kind = UPDATE_BEFORE
+                fields = [1, "A", 100]
+            },
+            {
+                kind = UPDATE_AFTER
+                fields = [1, "A_1", 100]
+            },
+            {
+                kind = DELETE
+                fields = [2, "B", 100]
+            }
+        ]
+    }
+}
+
+sink {
+    Doris {
+        fenodes = "doris_cdc_e2e:8030"
+        username = root
+        password = ""
+        table.identifier = "test.e2e_table_sink"
+        sink.label-prefix = "test-cdc"
+        sink.enable-2pc = "false"
+        sink.enable-delete = "true"
+        doris.config {
+            format="json"
+            read_json_by_line="true"
+        }
+    }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
index 536a5488f..24bd60677 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
@@ -67,7 +67,7 @@ import static org.awaitility.Awaitility.given;
 @Slf4j
 @Disabled
 public class JdbcDorisIT extends TestSuiteBase implements TestResource {
-    private static final String DOCKER_IMAGE = "taozex/doris:tagname";
+    private static final String DOCKER_IMAGE = "zykkk/doris:1.2.2.1-avx2-x86_84";
     private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
     private static final String HOST = "doris_e2e";
     private static final int DOCKER_PORT = 9030;
@@ -186,6 +186,7 @@ public class JdbcDorisIT extends TestSuiteBase implements TestResource {
                 new GenericContainer<>(DOCKER_IMAGE)
                         .withNetwork(TestSuiteBase.NETWORK)
                         .withNetworkAliases(HOST)
+                        .withPrivilegedMode(true)
                         .withLogConsumer(
                                 new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         dorisServer.setPortBindings(Lists.newArrayList(String.format("%s:%s", PORT, DOCKER_PORT)));
@@ -246,8 +247,9 @@ public class JdbcDorisIT extends TestSuiteBase implements TestResource {
         try {
             assertHasData(SINK_TABLE);
 
-            String sourceSql = String.format("select * from %s.%s", DATABASE, SOURCE_TABLE);
-            String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE);
+            String sourceSql =
+                    String.format("select * from %s.%s order by 1", DATABASE, SOURCE_TABLE);
+            String sinkSql = String.format("select * from %s.%s order by 1", DATABASE, SINK_TABLE);
             List<String> columnList =
                     Arrays.stream(COLUMN_STRING.split(","))
                             .map(String::trim)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/doris-jdbc-to-doris.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/doris-jdbc-to-doris.conf
index 3d2ea5f9b..ce07bbe26 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/doris-jdbc-to-doris.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/doris-jdbc-to-doris.conf
@@ -35,16 +35,15 @@ transform {
 
 sink {
     Doris {
-        nodeUrls = ["doris_e2e:8030"]
+        fenodes = "doris_e2e:8030"
         username = root
         password = ""
-        database = "test"
-        table = "e2e_table_sink"
-        batch_max_rows = 100
-        max_retries = 3
+        table.identifier = "test.e2e_table_sink"
+        sink.enable-2pc = "false"
+        sink.label-prefix = "test_doris"
         doris.config = {
-          format = "JSON"
-          strip_outer_array = true
+            format="json"
+            read_json_by_line="true"
         }
     }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 1affae9b3..a0e839d77 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -41,6 +41,7 @@
         <module>connector-http-e2e</module>
         <module>connector-rabbitmq-e2e</module>
         <module>connector-kafka-e2e</module>
+        <module>connector-doris-e2e</module>
         <module>connector-fake-e2e</module>
         <module>connector-elasticsearch-e2e</module>
         <module>connector-iotdb-e2e</module>