You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/11/06 08:56:10 UTC

(doris-flink-connector) branch master updated: [improve] add multi table sink to DorisSink (#224)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b2a14a  [improve] add multi table sink to DorisSink  (#224)
0b2a14a is described below

commit 0b2a14a93d3b65115c25fe59d84d6b3375da517d
Author: wudi <67...@qq.com>
AuthorDate: Mon Nov 6 16:56:04 2023 +0800

    [improve] add multi table sink to DorisSink  (#224)
    
    DorisSink supports multi-table import.
    
    Example:
    ```java
     Configuration config = new Configuration();
    //        config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6");
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
            env.setParallelism(1);
            env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/");
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000)));
            env.enableCheckpointing(10000);
            DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
            final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
            Properties properties = new Properties();
            properties.setProperty("column_separator", ",");
            properties.setProperty("line_delimiter", "\n");
            properties.setProperty("format", "csv");
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("127.0.0.1:8030")
                    .setTableIdentifier("")
                    .setUsername("root")
                    .setPassword("");
    
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setLabelPrefix("xxx12")
                    .setStreamLoadProp(properties)
                    .setDeletable(false).enable2PC();
    
            builder.setDorisReadOptions(readOptionBuilder.build())
                    .setDorisExecutionOptions(executionBuilder.build())
                    .setDorisOptions(dorisBuilder.build());
    
            RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
           RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
            DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
                    Arrays.asList(record, record1));
            stringDataStreamSource.sinkTo(builder.build());
    ```
    For details, please refer to `org.apache.doris.flink.DorisSinkStreamMultiTableExample.java`
---
 .../org/apache/doris/flink/cfg/DorisOptions.java   |   4 +
 .../org/apache/doris/flink/sink/BackendUtil.java   |  12 ++
 .../org/apache/doris/flink/sink/DorisSink.java     |   1 -
 .../doris/flink/sink/writer/DorisStreamLoad.java   |  12 +-
 .../doris/flink/sink/writer/DorisWriter.java       | 211 +++++++++++++++------
 .../doris/flink/sink/writer/DorisWriterState.java  |  32 +++-
 .../sink/writer/DorisWriterStateSerializer.java    |  17 +-
 .../doris/flink/sink/writer/LabelGenerator.java    |  19 +-
 .../flink/DorisSinkStreamMultiTableExample.java    | 100 ++++++++++
 .../org/apache/doris/flink/sink/HttpTestUtil.java  |  41 ++++
 .../flink/sink/writer/TestDorisStreamLoad.java     |  12 +-
 .../doris/flink/sink/writer/TestDorisWriter.java   |  12 +-
 .../writer/TestDorisWriterStateSerializer.java     |   4 +-
 13 files changed, 394 insertions(+), 83 deletions(-)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index f560eae..6391e91 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -52,6 +52,10 @@ public class DorisOptions extends DorisConnectionOptions {
         return tableIdentifier;
     }
 
+    public void setTableIdentifier(String tableIdentifier) {
+        this.tableIdentifier = tableIdentifier;
+    }
+
     public String save() throws IllegalArgumentException {
         Properties copy = new Properties();
         return IOUtils.propsToString(copy);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
index 0d45e2f..954bdd0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java
@@ -17,7 +17,11 @@
 
 package org.apache.doris.flink.sink;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.BackendV2;
 import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;
 
@@ -62,6 +66,14 @@ public class BackendUtil {
         return backends;
     }
 
+    public static BackendUtil getInstance(DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger){
+        if(StringUtils.isNotEmpty(dorisOptions.getBenodes())){
+            return new BackendUtil(dorisOptions.getBenodes());
+        } else {
+            return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger));
+        }
+    }
+
     public String getAvailableBackend() {
         long tmp = pos + backends.size();
         while (pos < tmp) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index bc2d45c..3adcd9e 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -140,7 +140,6 @@ public class DorisSink<IN>
         public DorisSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
-            Preconditions.checkNotNull(serializer);
             if(dorisReadOptions == null) {
                 dorisReadOptions = DorisReadOptions.builder().build();
             }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 4fd9abf..fb904ef 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -114,6 +114,10 @@ public class DorisStreamLoad implements Serializable {
         return db;
     }
 
+    public String getTable() {
+        return table;
+    }
+
     public String getHostPort() {
         return hostPort;
     }
@@ -141,7 +145,7 @@ public class DorisStreamLoad implements Serializable {
             try {
                 // TODO: According to label abort txn. Currently, it can only be aborted based on txnid,
                 //  so we must first request a streamload based on the label to get the txnid.
-                String label = labelGenerator.generateLabel(startChkID);
+                String label = labelGenerator.generateTableLabel(startChkID);
                 HttpPutBuilder builder = new HttpPutBuilder();
                 builder.setUrl(loadUrlStr)
                         .baseAuth(user, passwd)
@@ -215,7 +219,7 @@ public class DorisStreamLoad implements Serializable {
 
     public RespContent stopLoad(String label) throws IOException{
         recordStream.endInput();
-        LOG.info("stream load stopped for {} on host {}", label, hostPort);
+        LOG.info("table {} stream load stopped for {} on host {}", table, label, hostPort);
         Preconditions.checkState(pendingLoadFuture != null);
         try {
            return handlePreCommitResponse(pendingLoadFuture.get());
@@ -233,7 +237,7 @@ public class DorisStreamLoad implements Serializable {
         loadBatchFirstRecord = !isResume;
         HttpPutBuilder putBuilder = new HttpPutBuilder();
         recordStream.startInput(isResume);
-        LOG.info("stream load started for {} on host {}", label, hostPort);
+        LOG.info("table {} stream load started for {} on host {}", table, label, hostPort);
         try {
             InputStreamEntity entity = new InputStreamEntity(recordStream);
             putBuilder.setUrl(loadUrlStr)
@@ -247,7 +251,7 @@ public class DorisStreamLoad implements Serializable {
                putBuilder.enable2PC();
             }
             pendingLoadFuture = executorService.submit(() -> {
-                LOG.info("start execute load");
+                LOG.info("table {} start execute load", table);
                 return httpClient.execute(putBuilder.build());
             });
         } catch (Exception e) {
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index b61d174..8550a21 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -23,28 +23,31 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.exception.DorisRuntimeException;
 import org.apache.doris.flink.exception.StreamLoadException;
-import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.apache.doris.flink.sink.BackendUtil;
 import org.apache.doris.flink.sink.DorisCommittable;
 import org.apache.doris.flink.sink.HttpUtil;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
 import org.apache.flink.api.connector.sink2.StatefulSink;
 import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -62,21 +65,21 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
     private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
     private final long lastCheckpointId;
     private long curCheckpointId;
-    private DorisStreamLoad dorisStreamLoad;
-    volatile boolean loading;
+    private Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
+    private Map<String, LabelGenerator> labelGeneratorMap = new ConcurrentHashMap<>();;
+    volatile boolean globalLoading;
+    private Map<String, Boolean> loadingMap = new ConcurrentHashMap<>();
     private final DorisOptions dorisOptions;
     private final DorisReadOptions dorisReadOptions;
     private final DorisExecutionOptions executionOptions;
     private final String labelPrefix;
-    private final LabelGenerator labelGenerator;
+    private final int subtaskId;
     private final int intervalTime;
-    private final DorisWriterState dorisWriterState;
     private final DorisRecordSerializer<IN> serializer;
     private final transient ScheduledExecutorService scheduledExecutorService;
     private transient Thread executorThread;
     private transient volatile Exception loadException = null;
     private BackendUtil backendUtil;
-    private String currentLabel;
 
     public DorisWriter(Sink.InitContext initContext,
                        Collection<DorisWriterState> state,
@@ -91,106 +94,188 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
         this.curCheckpointId = lastCheckpointId + 1;
         LOG.info("restore checkpointId {}", lastCheckpointId);
         LOG.info("labelPrefix " + executionOptions.getLabelPrefix());
-        this.dorisWriterState = new DorisWriterState(executionOptions.getLabelPrefix());
-        this.labelPrefix = executionOptions.getLabelPrefix() + "_" + initContext.getSubtaskId();
-        this.labelGenerator = new LabelGenerator(labelPrefix, executionOptions.enabled2PC());
+        this.labelPrefix = executionOptions.getLabelPrefix();
+        this.subtaskId = initContext.getSubtaskId();
         this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("stream-load-check"));
         this.serializer = serializer;
         this.dorisOptions = dorisOptions;
         this.dorisReadOptions = dorisReadOptions;
         this.executionOptions = executionOptions;
         this.intervalTime = executionOptions.checkInterval();
-        this.loading = false;
+        this.globalLoading = false;
 
         initializeLoad(state);
     }
 
     public void initializeLoad(Collection<DorisWriterState> state) {
-        this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
-                dorisOptions.getBenodes())
-                : new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
+        this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG);
         try {
-            this.dorisStreamLoad = new DorisStreamLoad(
-                    backendUtil.getAvailableBackend(),
-                    dorisOptions,
-                    executionOptions,
-                    labelGenerator, new HttpUtil().getHttpClient());
-            // TODO: we need check and abort all pending transaction.
-            //  Discard transactions that may cause the job to fail.
             if(executionOptions.enabled2PC()) {
-                dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
+                abortLingeringTransactions(state);
             }
         } catch (Exception e) {
+            LOG.error("Failed to abort transaction.", e);
             throw new DorisRuntimeException(e);
         }
         // get main work thread.
         executorThread = Thread.currentThread();
-        this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
         // when uploading data in streaming mode, we need to regularly detect whether there are exceptions.
         scheduledExecutorService.scheduleWithFixedDelay(this::checkDone, 200, intervalTime, TimeUnit.MILLISECONDS);
     }
 
+    private void abortLingeringTransactions(Collection<DorisWriterState> recoveredStates) throws Exception {
+        List<String> alreadyAborts = new ArrayList<>();
+        //abort label in state
+         for(DorisWriterState state : recoveredStates){
+             // Todo: When the sink parallelism is reduced,
+             //  the txn of the redundant task before aborting is also needed.
+             if(!state.getLabelPrefix().equals(labelPrefix)){
+                 LOG.warn("Label prefix from previous execution {} has changed to {}.", state.getLabelPrefix(), executionOptions.getLabelPrefix());
+             }
+             String key = state.getDatabase() + "." + state.getTable();
+             DorisStreamLoad streamLoader = getStreamLoader(key);
+             streamLoader.abortPreCommit(state.getLabelPrefix(), curCheckpointId);
+             alreadyAborts.add(state.getLabelPrefix());
+        }
+
+        // TODO: In a multi-table scenario, if do not restore from checkpoint,
+        //  when modify labelPrefix at startup, we cannot abort the previous label.
+        if(!alreadyAborts.contains(labelPrefix)
+                && StringUtils.isNotEmpty(dorisOptions.getTableIdentifier())
+                && StringUtils.isNotEmpty(labelPrefix)){
+            //abort current labelPrefix
+            DorisStreamLoad streamLoader = getStreamLoader(dorisOptions.getTableIdentifier());
+            streamLoader.abortPreCommit(labelPrefix, curCheckpointId);
+        }
+    }
+
     @Override
     public void write(IN in, Context context) throws IOException {
         checkLoadException();
-        byte[] serialize = serializer.serialize(in);
-        if(Objects.isNull(serialize)){
-            //ddl record
+        Tuple2<String, byte[]> rowTuple = serializeRecord(in);
+        String tableKey = rowTuple.f0;
+        byte[] serializeRow = rowTuple.f1;
+        if(serializeRow == null){
             return;
         }
-        if(!loading) {
+
+        DorisStreamLoad streamLoader = getStreamLoader(tableKey);
+        if(!loadingMap.containsKey(tableKey)) {
             // start stream load only when there has data
-            dorisStreamLoad.startLoad(currentLabel, false);
-            loading = true;
+            LabelGenerator labelGenerator = getLabelGenerator(tableKey);
+            String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
+            streamLoader.startLoad(currentLabel, false);
+            loadingMap.put(tableKey, true);
+            globalLoading = true;
+        }
+        streamLoader.writeRecord(serializeRow);
+    }
+
+    private Tuple2<String, byte[]> serializeRecord(IN in) throws IOException {
+        String tableKey = dorisOptions.getTableIdentifier();
+        byte[] serializeRow = null;
+        if(serializer != null) {
+            serializeRow = serializer.serialize(in);
+            if(Objects.isNull(serializeRow)){
+                //ddl record by JsonDebeziumSchemaSerializer
+                return Tuple2.of(tableKey, null);
+            }
         }
-        dorisStreamLoad.writeRecord(serialize);
+        //multi table load
+        if(in instanceof RecordWithMeta){
+            RecordWithMeta row = (RecordWithMeta) in;
+            if(StringUtils.isBlank(row.getTable())
+                    || StringUtils.isBlank(row.getDatabase())
+                    || row.getRecord() == null){
+                LOG.warn("Record or meta format is incorrect, ignore record db:{}, table:{}, row:{}", row.getDatabase(), row.getTable(), row.getRecord());
+                return Tuple2.of(tableKey, null);
+            }
+            tableKey = row.getDatabase() + "." + row.getTable();
+            serializeRow = row.getRecord().getBytes(StandardCharsets.UTF_8);
+        }
+        return Tuple2.of(tableKey, serializeRow);
     }
 
     @Override
     public void flush(boolean flush) throws IOException, InterruptedException {
-
+        //No action is triggered, everything is in the precommit method
     }
 
-
     @Override
     public Collection<DorisCommittable> prepareCommit() throws IOException, InterruptedException {
-        if(!loading){
-            //There is no data during the entire checkpoint period
+        // Verify whether data is written during a checkpoint
+        if(!globalLoading && loadingMap.values().stream().noneMatch(Boolean::booleanValue)){
             return Collections.emptyList();
         }
         // disable exception checker before stop load.
-        loading = false;
-        Preconditions.checkState(dorisStreamLoad != null);
-        RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
-        if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
-            String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
-            throw new DorisRuntimeException(errMsg);
-        }
-        if (!executionOptions.enabled2PC()) {
-            return Collections.emptyList();
+        globalLoading = false;
+        // clean loadingMap
+        loadingMap.clear();
+
+        // submit stream load http request
+        List<DorisCommittable> committableList = new ArrayList<>();
+        for(Map.Entry<String, DorisStreamLoad> streamLoader : dorisStreamLoadMap.entrySet()){
+            String tableIdentifier = streamLoader.getKey();
+            DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
+            LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
+            String currentLabel = labelGenerator.generateTableLabel(curCheckpointId);
+            RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
+            if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
+                String errMsg = String.format("tabel {} stream load error: %s, see more in %s", tableIdentifier, respContent.getMessage(), respContent.getErrorURL());
+                throw new DorisRuntimeException(errMsg);
+            }
+            if(executionOptions.enabled2PC()){
+                long txnId = respContent.getTxnId();
+                committableList.add(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+            }
         }
-        long txnId = respContent.getTxnId();
-        return Collections.singletonList(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
+        return committableList;
     }
 
     @Override
     public List<DorisWriterState> snapshotState(long checkpointId) throws IOException {
-        Preconditions.checkState(dorisStreamLoad != null);
-        // dynamic refresh BE node
-        this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+        List<DorisWriterState> writerStates = new ArrayList<>();
+        for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){
+            //Dynamic refresh backend
+            dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+            DorisWriterState writerState = new DorisWriterState(labelPrefix, dorisStreamLoad.getDb(), dorisStreamLoad.getTable(), subtaskId);
+            writerStates.add(writerState);
+        }
         this.curCheckpointId = checkpointId + 1;
-        this.currentLabel = labelGenerator.generateLabel(curCheckpointId);
-        return Collections.singletonList(dorisWriterState);
+        return writerStates;
+    }
+
+    private LabelGenerator getLabelGenerator(String tableKey){
+        return labelGeneratorMap.computeIfAbsent(tableKey, v-> new LabelGenerator(labelPrefix, executionOptions.enabled2PC(), tableKey, subtaskId));
+    }
+
+    private DorisStreamLoad getStreamLoader(String tableKey){
+        LabelGenerator labelGenerator = getLabelGenerator(tableKey);
+        dorisOptions.setTableIdentifier(tableKey);
+        return dorisStreamLoadMap.computeIfAbsent(tableKey, v -> new DorisStreamLoad(backendUtil.getAvailableBackend(),
+                dorisOptions,
+                executionOptions,
+                labelGenerator,
+                new HttpUtil().getHttpClient()));
     }
 
+    /**
+     * Check the streamload http request regularly
+     */
     private void checkDone() {
+        for(Map.Entry<String, DorisStreamLoad> streamLoadMap : dorisStreamLoadMap.entrySet()){
+            checkAllDone(streamLoadMap.getKey(), streamLoadMap.getValue());
+        }
+    }
+
+    private void checkAllDone(String tableIdentifier, DorisStreamLoad dorisStreamLoad){
         // the load future is done and checked in prepareCommit().
         // this will check error while loading.
         LOG.debug("start timer checker, interval {} ms", intervalTime);
         if (dorisStreamLoad.getPendingLoadFuture() != null
                 && dorisStreamLoad.getPendingLoadFuture().isDone()) {
-            if (!loading) {
-                LOG.debug("not loading, skip timer checker");
+            if (!globalLoading || !loadingMap.get(tableIdentifier)) {
+                LOG.debug("not loading, skip timer checker for table {}", tableIdentifier);
                 return;
             }
 
@@ -202,13 +287,14 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
                 // use send cached data to new txn, then notify to restart the stream
                 if (executionOptions.isUseCache()) {
                     try {
-                        this.dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
+                        dorisStreamLoad.setHostPort(backendUtil.getAvailableBackend());
                         if (executionOptions.enabled2PC()) {
                             dorisStreamLoad.abortPreCommit(labelPrefix, curCheckpointId);
                         }
                         // start a new txn(stream load)
-                        LOG.info("getting exception, breakpoint resume for checkpoint ID: {}", curCheckpointId);
-                        dorisStreamLoad.startLoad(labelGenerator.generateLabel(curCheckpointId), true);
+                        LOG.info("getting exception, breakpoint resume for checkpoint ID: {}, table {}", curCheckpointId, tableIdentifier);
+                        LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
+                        dorisStreamLoad.startLoad(labelGenerator.generateTableLabel(curCheckpointId), true);
                     } catch (Exception e) {
                         throw new DorisRuntimeException(e);
                     }
@@ -222,7 +308,7 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
                     }
 
                     loadException = new StreamLoadException(errorMsg);
-                    LOG.error("stream load finished unexpectedly, interrupt worker thread! {}", errorMsg);
+                    LOG.error("table {} stream load finished unexpectedly, interrupt worker thread! {}", tableIdentifier, errorMsg);
                     // set the executor thread interrupted in case blocking in write data.
                     executorThread.interrupt();
                 }
@@ -238,21 +324,24 @@ public class DorisWriter<IN> implements StatefulSink.StatefulSinkWriter<IN, Dori
 
     @VisibleForTesting
     public boolean isLoading() {
-        return this.loading;
+        return this.globalLoading;
     }
 
     @VisibleForTesting
-    public void setDorisStreamLoad(DorisStreamLoad streamLoad) {
-        this.dorisStreamLoad = streamLoad;
+    public void setDorisStreamLoadMap(Map<String, DorisStreamLoad> streamLoadMap) {
+        this.dorisStreamLoadMap = streamLoadMap;
     }
 
     @Override
     public void close() throws Exception {
+        LOG.info("Close DorisWriter.");
         if (scheduledExecutorService != null) {
             scheduledExecutorService.shutdownNow();
         }
-        if (dorisStreamLoad != null) {
-            dorisStreamLoad.close();
+        if (dorisStreamLoadMap != null && !dorisStreamLoadMap.isEmpty()) {
+            for(DorisStreamLoad dorisStreamLoad : dorisStreamLoadMap.values()){
+                dorisStreamLoad.close();
+            }
         }
     }
 
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
index 1ec9727..a08e58d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterState.java
@@ -24,14 +24,36 @@ import java.util.Objects;
  */
 public class DorisWriterState {
     String labelPrefix;
+    String database;
+    String table;
+    int subtaskId;
     public DorisWriterState(String labelPrefix) {
         this.labelPrefix = labelPrefix;
     }
 
+    public DorisWriterState(String labelPrefix, String database, String table, int subtaskId) {
+        this.labelPrefix = labelPrefix;
+        this.database = database;
+        this.table = table;
+        this.subtaskId = subtaskId;
+    }
+
     public String getLabelPrefix() {
         return labelPrefix;
     }
 
+    public String getDatabase() {
+        return database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public int getSubtaskId() {
+        return subtaskId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -41,18 +63,24 @@ public class DorisWriterState {
             return false;
         }
         DorisWriterState that = (DorisWriterState) o;
-        return Objects.equals(labelPrefix, that.labelPrefix);
+        return Objects.equals(labelPrefix, that.labelPrefix)
+                && Objects.equals(database, that.database)
+                && Objects.equals(table, that.table)
+                && Objects.equals(subtaskId, that.subtaskId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(labelPrefix);
+        return Objects.hash(labelPrefix, database, table, subtaskId);
     }
 
     @Override
     public String toString() {
         return "DorisWriterState{" +
                 "labelPrefix='" + labelPrefix + '\'' +
+                ", database='" + database + '\'' +
+                ", table='" + table + '\'' +
+                ", subtaskId=" + subtaskId +
                 '}';
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
index aed97b2..97f5cc1 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriterStateSerializer.java
@@ -30,9 +30,10 @@ import java.io.IOException;
  */
 public class DorisWriterStateSerializer implements SimpleVersionedSerializer<DorisWriterState> {
 
+    private static final int VERSION = 2;
     @Override
     public int getVersion() {
-        return 1;
+        return VERSION;
     }
 
     @Override
@@ -40,6 +41,9 @@ public class DorisWriterStateSerializer implements SimpleVersionedSerializer<Dor
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
              final DataOutputStream out = new DataOutputStream(baos)) {
             out.writeUTF(dorisWriterState.getLabelPrefix());
+            out.writeUTF(dorisWriterState.getDatabase());
+            out.writeUTF(dorisWriterState.getTable());
+            out.writeInt(dorisWriterState.getSubtaskId());
             out.flush();
             return baos.toByteArray();
         }
@@ -49,8 +53,15 @@ public class DorisWriterStateSerializer implements SimpleVersionedSerializer<Dor
     public DorisWriterState deserialize(int version, byte[] serialized) throws IOException {
         try (final ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
              final DataInputStream in = new DataInputStream(bais)) {
-            final String labelPrefix = in.readUTF();
-            return new DorisWriterState(labelPrefix);
+             String labelPrefix = in.readUTF();
+             if(version == 1){
+                 return new DorisWriterState(labelPrefix);
+             }else {
+                 final String database = in.readUTF();
+                 final String table = in.readUTF();
+                 final int subtaskId = in.readInt();
+                 return new DorisWriterState(labelPrefix, database, table, subtaskId);
+             }
         }
     }
 }
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
index 55f7811..60f5595 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LabelGenerator.java
@@ -16,6 +16,8 @@
 // under the License.
 package org.apache.doris.flink.sink.writer;
 
+import org.apache.flink.util.Preconditions;
+
 import java.util.UUID;
 
 /**
@@ -24,14 +26,29 @@ import java.util.UUID;
 public class LabelGenerator {
     private String labelPrefix;
     private boolean enable2PC;
+    private String tableIdentifier;
+    private int subtaskId;
 
     public LabelGenerator(String labelPrefix, boolean enable2PC) {
         this.labelPrefix = labelPrefix;
         this.enable2PC = enable2PC;
     }
 
+    public LabelGenerator(String labelPrefix, boolean enable2PC, String tableIdentifier, int subtaskId) {
+        this(labelPrefix, enable2PC);
+        // The label of stream load can not contain `.`
+        this.tableIdentifier = tableIdentifier.replace(".", "_");
+        this.subtaskId = subtaskId;
+    }
+
     public String generateLabel(long chkId) {
-        String label = labelPrefix + "_" + chkId;
+        String label = String.format("%s_%s_%s", labelPrefix, subtaskId, chkId);
+        return enable2PC ? label : label + "_" + UUID.randomUUID();
+    }
+
+    public String generateTableLabel(long chkId) {
+        Preconditions.checkState(tableIdentifier != null);
+        String label = String.format("%s_%s_%s_%s", labelPrefix, tableIdentifier, subtaskId, chkId);
         return enable2PC ? label : label + "_" + UUID.randomUUID();
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
new file mode 100644
index 0000000..a884ea2
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkStreamMultiTableExample.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
+import org.apache.doris.flink.sink.batch.RecordWithMeta;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+
+import java.util.Properties;
+import java.util.UUID;
+
+
+public class DorisSinkStreamMultiTableExample {
+    public static void main(String[] args) throws Exception {
+        Configuration config = new Configuration();
+//        config.setString("execution.savepoint.path","/tmp/checkpoint/chk-6");
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(1);
+        env.getCheckpointConfig().setCheckpointStorage("file:///tmp/checkpoint/");
+        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(10000)));
+        env.enableCheckpointing(10000);
+        DorisSink.Builder<RecordWithMeta> builder = DorisSink.builder();
+        final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
+        Properties properties = new Properties();
+        properties.setProperty("column_separator", ",");
+        properties.setProperty("line_delimiter", "\n");
+        properties.setProperty("format", "csv");
+        DorisOptions.Builder dorisBuilder = DorisOptions.builder();
+        dorisBuilder.setFenodes("127.0.0.1:8030")
+                .setTableIdentifier("")
+                .setUsername("root")
+                .setPassword("");
+
+        DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
+        executionBuilder.setLabelPrefix("xxx12")
+                .setStreamLoadProp(properties)
+                .setDeletable(false).enable2PC();
+
+        builder.setDorisReadOptions(readOptionBuilder.build())
+                .setDorisExecutionOptions(executionBuilder.build())
+                .setDorisOptions(dorisBuilder.build());
+
+//        RecordWithMeta record = new RecordWithMeta("test", "test_flink_tmp1", "wangwu,1");
+//        RecordWithMeta record1 = new RecordWithMeta("test", "test_flink_tmp", "wangwu,1");
+//        DataStreamSource<RecordWithMeta> stringDataStreamSource = env.fromCollection(
+//                Arrays.asList(record, record1));
+//        stringDataStreamSource.sinkTo(builder.build());
+
+          env.addSource(new ParallelSourceFunction<RecordWithMeta>() {
+            private Long id = 1000000L;
+            @Override
+            public void run(SourceContext<RecordWithMeta> out) throws Exception {
+                while (true) {
+                    id = id + 1;
+                    RecordWithMeta record = new RecordWithMeta("test", "test_flink_a", UUID.randomUUID() + ",1");
+                    out.collect(record);
+                    record = new RecordWithMeta("test", "test_flink_b", UUID.randomUUID() + ",2");
+                    out.collect(record);
+                    if(id > 100){
+                        //mock dynamic add table
+                        RecordWithMeta record3 = new RecordWithMeta("test", "test_flink_c", UUID.randomUUID() + ",1");
+                        out.collect(record3);
+                    }
+                    Thread.sleep(3000);
+                }
+            }
+
+            @Override
+            public void cancel() {
+
+            }
+        }).sinkTo(builder.build());
+
+        env.execute("doris stream multi table test");
+    }
+
+}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
index 67e8ba3..99e9846 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/HttpTestUtil.java
@@ -50,6 +50,27 @@ public class HttpTestUtil {
             "}\n" +
             "\n";
 
+    public static final String LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE = "{\n" +
+            "\"TxnId\": 1,\n" +
+            "\"Label\": \"test001_db_table_0_1\",\n" +
+            "\"TwoPhaseCommit\": \"true\",\n" +
+            "\"Status\": \"Label Already Exists\",\n" +
+            "\"ExistingJobStatus\": \"PRECOMMITTED\",\n" +
+            "\"Message\": \"errCode = 2, detailMessage = Label [test001_db_table_0_1] has already been used, relate to txn [1]\",\n" +
+            "\"NumberTotalRows\": 0,\n" +
+            "\"NumberLoadedRows\": 0,\n" +
+            "\"NumberFilteredRows\": 0,\n" +
+            "\"NumberUnselectedRows\": 0,\n" +
+            "\"LoadBytes\": 0,\n" +
+            "\"LoadTimeMs\": 0,\n" +
+            "\"BeginTxnTimeMs\": 0,\n" +
+            "\"StreamLoadPutTimeMs\": 0,\n" +
+            "\"ReadDataTimeMs\": 0,\n" +
+            "\"WriteDataTimeMs\": 0,\n" +
+            "\"CommitAndPublishTimeMs\": 0\n" +
+            "}\n" +
+            "\n";
+
     public static final String PRE_COMMIT_RESPONSE = "{\n" +
             "\"TxnId\": 2,\n" +
             "\"Label\": \"test001_0_2\",\n" +
@@ -70,6 +91,26 @@ public class HttpTestUtil {
             "}\n" +
             "\n";
 
+    public static final String PRE_COMMIT_TABLE_RESPONSE = "{\n" +
+            "\"TxnId\": 2,\n" +
+            "\"Label\": \"test001_db_table_0_2\",\n" +
+            "\"TwoPhaseCommit\": \"true\",\n" +
+            "\"Status\": \"Success\",\n" +
+            "\"Message\": \"OK\",\n" +
+            "\"NumberTotalRows\": 0,\n" +
+            "\"NumberLoadedRows\": 0,\n" +
+            "\"NumberFilteredRows\": 0,\n" +
+            "\"NumberUnselectedRows\": 0,\n" +
+            "\"LoadBytes\": 0,\n" +
+            "\"LoadTimeMs\": 0,\n" +
+            "\"BeginTxnTimeMs\": 0,\n" +
+            "\"StreamLoadPutTimeMs\": 0,\n" +
+            "\"ReadDataTimeMs\": 0,\n" +
+            "\"WriteDataTimeMs\": 0,\n" +
+            "\"CommitAndPublishTimeMs\": 0\n" +
+            "}\n" +
+            "\n";
+
     public static final String ABORT_SUCCESS_RESPONSE = "{\n" +
             "\"status\": \"Success\",\n" +
             "\"msg\": \"transaction [1] abort successfully.\"\n" +
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index e295de1..8f292e2 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -56,13 +56,13 @@ public class TestDorisStreamLoad {
     @Test
     public void testAbortPreCommit() throws Exception {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
-        CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_RESPONSE, true);
-        CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
+        CloseableHttpResponse existLabelResponse = HttpTestUtil.getResponse(HttpTestUtil.LABEL_EXIST_PRE_COMMIT_TABLE_RESPONSE, true);
+        CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_TABLE_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(existLabelResponse, preCommitResponse);
-        DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient));
+        DorisStreamLoad dorisStreamLoad = spy(new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true, "db.table", 0), httpClient));
 
         doNothing().when(dorisStreamLoad).abortTransaction(anyLong());
-        dorisStreamLoad.abortPreCommit("test001_0", 1);
+        dorisStreamLoad.abortPreCommit("test001", 1);
     }
 
     @Test
@@ -70,7 +70,7 @@ public class TestDorisStreamLoad {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse abortSuccessResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_SUCCESS_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(abortSuccessResponse);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient);
         dorisStreamLoad.abortTransaction(anyLong());
     }
 
@@ -79,7 +79,7 @@ public class TestDorisStreamLoad {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse abortFailedResponse = HttpTestUtil.getResponse(HttpTestUtil.ABORT_FAILED_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(abortFailedResponse);
-        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001_0", true), httpClient);
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("", dorisOptions, executionOptions, new LabelGenerator("test001", true), httpClient);
         dorisStreamLoad.abortTransaction(anyLong());
     }
 
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
index 01e1559..36f98c8 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java
@@ -34,7 +34,9 @@ import org.junit.Test;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -61,13 +63,14 @@ public class TestDorisWriter {
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
-
+        Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad);
         dorisStreamLoad.startLoad("", false);
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
-        dorisWriter.setDorisStreamLoad(dorisStreamLoad);
+        dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
         dorisWriter.write("doris,1",null);
         Collection<DorisCommittable> committableList = dorisWriter.prepareCommit();
         Assert.assertEquals(1, committableList.size());
@@ -84,11 +87,14 @@ public class TestDorisWriter {
         CloseableHttpResponse preCommitResponse = HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);
 
+        Map<String, DorisStreamLoad> dorisStreamLoadMap = new ConcurrentHashMap<>();
         DorisStreamLoad dorisStreamLoad = new DorisStreamLoad("local:8040", dorisOptions, executionOptions, new LabelGenerator("", true), httpClient);
+        dorisStreamLoadMap.put(dorisOptions.getTableIdentifier(), dorisStreamLoad);
+
         Sink.InitContext initContext = mock(Sink.InitContext.class);
         when(initContext.getRestoredCheckpointId()).thenReturn(OptionalLong.of(1));
         DorisWriter<String> dorisWriter = new DorisWriter<String>(initContext, Collections.emptyList(), new SimpleStringSerializer(), dorisOptions, readOptions, executionOptions);
-        dorisWriter.setDorisStreamLoad(dorisStreamLoad);
+        dorisWriter.setDorisStreamLoadMap(dorisStreamLoadMap);
         List<DorisWriterState> writerStates = dorisWriter.snapshotState(1);
 
         Assert.assertEquals(1, writerStates.size());
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
index 84695d7..fce2a26 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriterStateSerializer.java
@@ -26,9 +26,9 @@ import org.junit.Test;
 public class TestDorisWriterStateSerializer {
     @Test
     public void testSerialize() throws Exception {
-        DorisWriterState expectDorisWriterState = new DorisWriterState("doris");
+        DorisWriterState expectDorisWriterState = new DorisWriterState("doris",  "db", "table", 0);
         DorisWriterStateSerializer serializer = new DorisWriterStateSerializer();
-        DorisWriterState dorisWriterState =  serializer.deserialize(1, serializer.serialize(expectDorisWriterState));
+        DorisWriterState dorisWriterState =  serializer.deserialize(2, serializer.serialize(expectDorisWriterState));
         Assert.assertEquals(expectDorisWriterState, dorisWriterState);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org