You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/09 14:13:01 UTC

[incubator-doris] branch master updated: [Feature]:Flink-connector supports streamload parameters (#6243)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d9fc1bf  [Feature]:Flink-connector supports streamload parameters (#6243)
d9fc1bf is described below

commit d9fc1bf3ca67b2f0d8925425cf9a4362829f8b62
Author: wudi <67...@qq.com>
AuthorDate: Mon Aug 9 22:12:46 2021 +0800

    [Feature]:Flink-connector supports streamload parameters (#6243)
    
    Flink-connector supports streamload parameters
    #6199
---
 docs/en/extending-doris/flink-doris-connector.md   |   1 +
 .../zh-CN/extending-doris/flink-doris-connector.md |   1 +
 .../apache/doris/flink/backend/BackendClient.java  |   5 +-
 .../doris/flink/cfg/DorisConnectionOptions.java    |   2 +-
 .../doris/flink/cfg/DorisExecutionOptions.java     |  23 +-
 .../org/apache/doris/flink/cfg/DorisOptions.java   |   2 +-
 .../apache/doris/flink/cfg/DorisReadOptions.java   |   8 +-
 .../flink/datastream/DorisSourceFunction.java      |  16 +-
 .../SimpleListDeserializationSchema.java           |   3 +-
 .../doris/flink/exception/DorisException.java      |   8 +-
 .../exception/ShouldNeverHappenException.java      |   3 +-
 .../doris/flink/exception/StreamLoadException.java |   4 +
 .../doris/flink/rest/PartitionDefinition.java      |   2 +-
 .../org/apache/doris/flink/rest/RestService.java   |  95 +++--
 .../org/apache/doris/flink/rest/SchemaUtils.java   |   3 +-
 .../org/apache/doris/flink/rest/models/Field.java  |   3 +-
 .../apache/doris/flink/serialization/RowBatch.java |   8 +-
 .../flink/table/DorisDynamicOutputFormat.java      |  57 +--
 .../flink/table/DorisDynamicTableFactory.java      | 427 +++++++++++----------
 .../doris/flink/table/DorisDynamicTableSink.java   |   2 +-
 .../doris/flink/table/DorisDynamicTableSource.java |  90 ++---
 .../doris/flink/table/DorisRowDataInputFormat.java | 358 ++++++++---------
 .../apache/doris/flink/table/DorisStreamLoad.java  |  35 +-
 .../doris/flink/table/DorisTableInputSplit.java    |   8 +-
 24 files changed, 626 insertions(+), 538 deletions(-)

diff --git a/docs/en/extending-doris/flink-doris-connector.md b/docs/en/extending-doris/flink-doris-connector.md
index 39bbe41..d17fb8a 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -131,6 +131,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
 | sink.batch.size                        | 100            | Maximum number of lines in a single write BE                                             |
 | sink.max-retries                        | 1            | Number of retries after writing BE failed                                              |
 | sink.batch.interval                         | 1s            | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 1 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing. |
+| sink.properties.*     | --               | The stream load parameters.eg:sink.properties.column_separator' = ','. |
 
 
 ## Doris & Flink Column Type Mapping
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md b/docs/zh-CN/extending-doris/flink-doris-connector.md
index 22363db..ce89964 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -135,6 +135,7 @@ INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
 | sink.batch.size     | 100                | 单次写BE的最大行数        |
 | sink.max-retries     | 1                | 写BE失败之后的重试次数       |
 | sink.batch.interval     | 1s                | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。|
+| sink.properties.*     | --               | Stream load 的导入参数。例如:sink.properties.column_separator' = ','等 |
 
 
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 93b353c..40bb5c9 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -112,6 +112,7 @@ public class BackendClient {
 
     /**
      * Open a scanner for reading Doris data.
+     *
      * @param openParams thrift struct to required by request
      * @return scan open result
      * @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -147,6 +148,7 @@ public class BackendClient {
 
     /**
      * get next row batch from Doris BE
+     *
      * @param nextBatchParams thrift struct to required by request
      * @return scan batch result
      * @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -161,7 +163,7 @@ public class BackendClient {
         for (int attempt = 0; attempt < retries; ++attempt) {
             logger.debug("Attempt {} to getNext {}.", attempt, routing);
             try {
-                result  = client.get_next(nextBatchParams);
+                result = client.get_next(nextBatchParams);
                 if (result == null) {
                     logger.warn("GetNext result from {} is null.", routing);
                     continue;
@@ -189,6 +191,7 @@ public class BackendClient {
 
     /**
      * close an scanner.
+     *
      * @param closeParams thrift struct to required by request
      */
     public void closeScanner(TScanCloseParams closeParams) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
index 619ce74..9b2187c 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisConnectionOptions.java
@@ -21,7 +21,7 @@ import org.apache.flink.util.Preconditions;
 import java.io.Serializable;
 
 /**
- *  Doris connection options.
+ * Doris connection options.
  */
 public class DorisConnectionOptions implements Serializable {
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 330cbc9..3d035ab 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -21,22 +21,29 @@ import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.time.Duration;
+import java.util.Properties;
 
 /**
  * JDBC sink batch options.
  */
-public class DorisExecutionOptions  implements Serializable {
+public class DorisExecutionOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
     private final Integer batchSize;
     private final Integer maxRetries;
     private final Long batchIntervalMs;
 
-    public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) {
+    /**
+     * Properties for the StreamLoad.
+     */
+    private final Properties streamLoadProp;
+
+    public DorisExecutionOptions(Integer batchSize, Integer maxRetries, Long batchIntervalMs, Properties streamLoadProp) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.batchSize = batchSize;
         this.maxRetries = maxRetries;
         this.batchIntervalMs = batchIntervalMs;
+        this.streamLoadProp = streamLoadProp;
     }
 
     public Integer getBatchSize() {
@@ -51,6 +58,10 @@ public class DorisExecutionOptions  implements Serializable {
         return batchIntervalMs;
     }
 
+    public Properties getStreamLoadProp() {
+        return streamLoadProp;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -62,6 +73,7 @@ public class DorisExecutionOptions  implements Serializable {
         private Integer batchSize;
         private Integer maxRetries;
         private Long batchIntervalMs;
+        private Properties streamLoadProp;
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
@@ -78,8 +90,13 @@ public class DorisExecutionOptions  implements Serializable {
             return this;
         }
 
+        public Builder setStreamLoadProp(Properties streamLoadProp) {
+            this.streamLoadProp = streamLoadProp;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
+            return new DorisExecutionOptions(batchSize, maxRetries, batchIntervalMs, streamLoadProp);
         }
     }
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index c542d6b..512d0ab 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -25,7 +25,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Options for the Doris connector.
  */
-public class DorisOptions extends DorisConnectionOptions{
+public class DorisOptions extends DorisConnectionOptions {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 833ecf5..53cefaa 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -22,7 +22,7 @@ import java.io.Serializable;
 /**
  * Doris read Options
  */
-public class DorisReadOptions  implements Serializable {
+public class DorisReadOptions implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
@@ -35,7 +35,7 @@ public class DorisReadOptions  implements Serializable {
     private Integer requestRetries;
     private Integer requestBatchSize;
     private Long execMemLimit;
-    private Integer  deserializeQueueSize;
+    private Integer deserializeQueueSize;
     private Boolean deserializeArrowAsync;
 
     public DorisReadOptions(String readFields, String filterQuery, Integer requestTabletSize, Integer requestConnectTimeoutMs, Integer requestReadTimeoutMs,
@@ -117,7 +117,7 @@ public class DorisReadOptions  implements Serializable {
         private Integer requestRetries;
         private Integer requestBatchSize;
         private Long execMemLimit;
-        private Integer  deserializeQueueSize;
+        private Integer deserializeQueueSize;
         private Boolean deserializeArrowAsync;
 
 
@@ -177,7 +177,7 @@ public class DorisReadOptions  implements Serializable {
         }
 
         public DorisReadOptions build() {
-            return new DorisReadOptions(readFields,filterQuery,requestTabletSize,requestConnectTimeoutMs,requestReadTimeoutMs,requestQueryTimeoutS,requestRetries,requestBatchSize,execMemLimit,deserializeQueueSize,deserializeArrowAsync);
+            return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync);
         }
     }
 
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
index 5c580db..82ab224 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java
@@ -38,15 +38,15 @@ import java.util.List;
 
 public class DorisSourceFunction<T> extends RichSourceFunction<T> implements ResultTypeQueryable<T> {
 
-    private  static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
+    private static final Logger logger = LoggerFactory.getLogger(DorisSourceFunction.class);
 
     private DorisDeserializationSchema deserializer;
     private DorisOptions options;
     private DorisReadOptions readOptions;
-    private List<PartitionDefinition>  dorisPartitions;
+    private List<PartitionDefinition> dorisPartitions;
     private ScalaValueReader scalaValueReader;
 
-    public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer)  {
+    public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema deserializer) {
         this.deserializer = deserializer;
         this.options = streamOptions.getOptions();
         this.readOptions = streamOptions.getReadOptions();
@@ -55,14 +55,14 @@ public class DorisSourceFunction<T> extends RichSourceFunction<T> implements Res
     @Override
     public void open(Configuration parameters) throws Exception {
         super.open(parameters);
-        this.dorisPartitions =  RestService.findPartitions(options,readOptions,logger);
+        this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
     }
 
     @Override
-    public void run(SourceContext sourceContext) throws Exception{
-        for(PartitionDefinition partitions : dorisPartitions){
-            scalaValueReader = new ScalaValueReader(partitions, options,readOptions);
-            while (scalaValueReader.hasNext()){
+    public void run(SourceContext sourceContext) throws Exception {
+        for (PartitionDefinition partitions : dorisPartitions) {
+            scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
+            while (scalaValueReader.hasNext()) {
                 Object next = scalaValueReader.next();
                 sourceContext.collect(next);
             }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
index 6fd68ec..7fcf2f6 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/SimpleListDeserializationSchema.java
@@ -18,10 +18,11 @@ package org.apache.doris.flink.deserialization;
 
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+
 import java.util.List;
 
 
-public class SimpleListDeserializationSchema implements DorisDeserializationSchema{
+public class SimpleListDeserializationSchema implements DorisDeserializationSchema {
 
     @Override
     public TypeInformation getProducedType() {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
index 26e11e5..2274f87 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisException.java
@@ -21,18 +21,22 @@ public class DorisException extends Exception {
     public DorisException() {
         super();
     }
+
     public DorisException(String message) {
         super(message);
     }
+
     public DorisException(String message, Throwable cause) {
         super(message, cause);
     }
+
     public DorisException(Throwable cause) {
         super(cause);
     }
+
     protected DorisException(String message, Throwable cause,
-                        boolean enableSuppression,
-                        boolean writableStackTrace) {
+                             boolean enableSuppression,
+                             boolean writableStackTrace) {
         super(message, cause, enableSuppression, writableStackTrace);
     }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
index 307c398..a26718d 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java
@@ -17,4 +17,5 @@
 
 package org.apache.doris.flink.exception;
 
-public class ShouldNeverHappenException extends DorisException { }
+public class ShouldNeverHappenException extends DorisException {
+}
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
index 6d7a192..233d27e 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/StreamLoadException.java
@@ -21,15 +21,19 @@ public class StreamLoadException extends Exception {
     public StreamLoadException() {
         super();
     }
+
     public StreamLoadException(String message) {
         super(message);
     }
+
     public StreamLoadException(String message, Throwable cause) {
         super(message, cause);
     }
+
     public StreamLoadException(Throwable cause) {
         super(cause);
     }
+
     protected StreamLoadException(String message, Throwable cause,
                                   boolean enableSuppression,
                                   boolean writableStackTrace) {
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
index 19edd21..8a66f76 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/PartitionDefinition.java
@@ -103,7 +103,7 @@ public class PartitionDefinition implements Serializable, Comparable<PartitionDe
         similar.retainAll(o.tabletIds);
         diffSelf.removeAll(similar);
         diffOther.removeAll(similar);
-        if  (diffSelf.size() == 0) {
+        if (diffSelf.size() == 0) {
             return 0;
         }
         long diff = Collections.min(diffSelf) - Collections.min(diffOther);
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
index cd5b6d5..184afd3 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java
@@ -88,13 +88,14 @@ public class RestService implements Serializable {
 
     /**
      * send request to Doris FE and get response json string.
+     *
      * @param options configuration of request
      * @param request {@link HttpRequestBase} real request
-     * @param logger {@link Logger}
+     * @param logger  {@link Logger}
      * @return Doris FE response in json string
      * @throws ConnectedFailedException throw when cannot connect to Doris FE
      */
-    private static String send(DorisOptions options,DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
+    private static String send(DorisOptions options, DorisReadOptions readOptions, HttpRequestBase request, Logger logger) throws
             ConnectedFailedException {
         int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs();
         int socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs();
@@ -116,10 +117,10 @@ public class RestService implements Serializable {
             logger.debug("Attempt {} to request {}.", attempt, request.getURI());
             try {
                 String response;
-                if (request instanceof HttpGet){
-                    response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(),logger);
+                if (request instanceof HttpGet) {
+                    response = getConnectionGet(request.getURI().toString(), options.getUsername(), options.getPassword(), logger);
                 } else {
-                    response = getConnectionPost(request,  options.getUsername(), options.getPassword(),logger);
+                    response = getConnectionPost(request, options.getUsername(), options.getPassword(), logger);
                 }
                 if (response == null) {
                     logger.warn("Failed to get response from Doris FE {}, http code is {}",
@@ -147,14 +148,14 @@ public class RestService implements Serializable {
         throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex);
     }
 
-    private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException {
+    private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) throws IOException {
         URL url = new URL(request.getURI().toString());
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod(request.getMethod());
         String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
-        InputStream content = ((HttpPost)request).getEntity().getContent();
+        InputStream content = ((HttpPost) request).getEntity().getContent();
         String res = IOUtils.toString(content);
         conn.setDoOutput(true);
         conn.setDoInput(true);
@@ -164,21 +165,21 @@ public class RestService implements Serializable {
         // flush
         out.flush();
         // read response
-        return parseResponse(conn,logger);
+        return parseResponse(conn, logger);
     }
 
-    private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException {
+    private static String getConnectionGet(String request, String user, String passwd, Logger logger) throws IOException {
         URL realUrl = new URL(request);
         // open connection
-        HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection();
+        HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection();
         String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         connection.setRequestProperty("Authorization", "Basic " + authEncoding);
 
         connection.connect();
-        return parseResponse(connection,logger);
+        return parseResponse(connection, logger);
     }
 
-    private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException {
+    private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException {
         if (connection.getResponseCode() != HttpStatus.SC_OK) {
             logger.warn("Failed to get response from Doris  {}, http code is {}",
                     connection.getURL(), connection.getResponseCode());
@@ -198,8 +199,9 @@ public class RestService implements Serializable {
 
     /**
      * parse table identifier to array.
+     *
      * @param tableIdentifier table identifier string
-     * @param logger {@link Logger}
+     * @param logger          {@link Logger}
      * @return first element is db name, second element is table name
      * @throws IllegalArgumentException table identifier is illegal
      */
@@ -220,8 +222,9 @@ public class RestService implements Serializable {
 
     /**
      * choice a Doris FE node to request.
+     *
      * @param feNodes Doris FE node list, separate be comma
-     * @param logger slf4j logger
+     * @param logger  slf4j logger
      * @return the chosen one Doris FE node
      * @throws IllegalArgumentException fe nodes is illegal
      */
@@ -239,14 +242,15 @@ public class RestService implements Serializable {
 
     /**
      * choice a Doris BE node to request.
+     *
      * @param options configuration of request
-     * @param logger slf4j logger
+     * @param logger  slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
-    public static String randomBackend(DorisOptions options,DorisReadOptions readOptions ,Logger logger) throws DorisException, IOException {
-        List<BackendRow> backends = getBackends(options,readOptions, logger);
+    public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+        List<BackendRow> backends = getBackends(options, readOptions, logger);
         logger.trace("Parse beNodes '{}'.", backends);
         if (backends == null || backends.isEmpty()) {
             logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
@@ -259,19 +263,20 @@ public class RestService implements Serializable {
 
     /**
      * get  Doris BE nodes to request.
+     *
      * @param options configuration of request
-     * @param logger slf4j logger
+     * @param logger  slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
-    static List<BackendRow> getBackends(DorisOptions options,DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
+    static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
         String feNodes = options.getFenodes();
         String feNode = randomEndpoint(feNodes, logger);
         String beUrl = "http://" + feNode + BACKENDS;
         HttpGet httpGet = new HttpGet(beUrl);
-        String response = send(options, readOptions,httpGet, logger);
-        logger.info("Backend Info:{}",response);
+        String response = send(options, readOptions, httpGet, logger);
+        logger.info("Backend Info:{}", response);
         List<BackendRow> backends = parseBackend(response, logger);
         return backends;
     }
@@ -306,8 +311,9 @@ public class RestService implements Serializable {
 
     /**
      * get a valid URI to connect Doris FE.
+     *
      * @param options configuration of request
-     * @param logger {@link Logger}
+     * @param logger  {@link Logger}
      * @return uri string
      * @throws IllegalArgumentException throw when configuration is illegal
      */
@@ -323,24 +329,26 @@ public class RestService implements Serializable {
 
     /**
      * discover Doris table schema from Doris FE.
+     *
      * @param options configuration of request
-     * @param logger slf4j logger
+     * @param logger  slf4j logger
      * @return Doris table schema
      * @throws DorisException throw when discover failed
      */
-    public static Schema getSchema(DorisOptions options,DorisReadOptions readOptions, Logger logger)
+    public static Schema getSchema(DorisOptions options, DorisReadOptions readOptions, Logger logger)
             throws DorisException {
         logger.trace("Finding schema.");
         HttpGet httpGet = new HttpGet(getUriStr(options, logger) + SCHEMA);
-        String response = send(options, readOptions,httpGet, logger);
+        String response = send(options, readOptions, httpGet, logger);
         logger.debug("Find schema response is '{}'.", response);
         return parseSchema(response, logger);
     }
 
     /**
      * translate Doris FE response to inner {@link Schema} struct.
+     *
      * @param response Doris FE response
-     * @param logger {@link Logger}
+     * @param logger   {@link Logger}
      * @return inner {@link Schema} struct
      * @throws DorisException throw when translate failed
      */
@@ -381,14 +389,15 @@ public class RestService implements Serializable {
 
     /**
      * find Doris RDD partitions from Doris FE.
+     *
      * @param options configuration of request
-     * @param logger {@link Logger}
+     * @param logger  {@link Logger}
      * @return an list of Doris RDD partitions
      * @throws DorisException throw when find partition failed
      */
     public static List<PartitionDefinition> findPartitions(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException {
         String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
-        String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" :readOptions.getReadFields();
+        String readFields = StringUtils.isBlank(readOptions.getReadFields()) ? "*" : readOptions.getReadFields();
         String sql = "select " + readFields +
                 " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`";
         if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
@@ -397,14 +406,14 @@ public class RestService implements Serializable {
         logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
 
         HttpPost httpPost = new HttpPost(getUriStr(options, logger) + QUERY_PLAN);
-        String entity = "{\"sql\": \""+ sql +"\"}";
+        String entity = "{\"sql\": \"" + sql + "\"}";
         logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
         StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
         stringEntity.setContentEncoding("UTF-8");
         stringEntity.setContentType("application/json");
         httpPost.setEntity(stringEntity);
 
-        String resStr = send(options, readOptions,httpPost, logger);
+        String resStr = send(options, readOptions, httpPost, logger);
         logger.debug("Find partition response is '{}'.", resStr);
         QueryPlan queryPlan = getQueryPlan(resStr, logger);
         Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);
@@ -420,8 +429,9 @@ public class RestService implements Serializable {
 
     /**
      * translate Doris FE response string to inner {@link QueryPlan} struct.
+     *
      * @param response Doris FE response string
-     * @param logger {@link Logger}
+     * @param logger   {@link Logger}
      * @return inner {@link QueryPlan} struct
      * @throws DorisException throw when translate failed.
      */
@@ -461,13 +471,14 @@ public class RestService implements Serializable {
 
     /**
      * select which Doris BE to get tablet data.
+     *
      * @param queryPlan {@link QueryPlan} translated from Doris FE response
-     * @param logger {@link Logger}
+     * @param logger    {@link Logger}
      * @return BE to tablets {@link Map}
      * @throws DorisException throw when select failed.
      */
     @VisibleForTesting
-    static  Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
+    static Map<String, List<Long>> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException {
         Map<String, List<Long>> be2Tablets = new HashMap<>();
         for (Map.Entry<String, Tablet> part : queryPlan.getPartitions().entrySet()) {
             logger.debug("Parse tablet info: '{}'.", part);
@@ -512,8 +523,9 @@ public class RestService implements Serializable {
 
     /**
      * tablet count limit for one Doris RDD partition
+     *
      * @param readOptions configuration of request
-     * @param logger {@link Logger}
+     * @param logger      {@link Logger}
      * @return tablet count limit
      */
     @VisibleForTesting
@@ -533,18 +545,19 @@ public class RestService implements Serializable {
 
     /**
      * translate BE tablets map to Doris RDD partition.
-     * @param options configuration of request
-     * @param be2Tablets BE to tablets {@link Map}
+     *
+     * @param options          configuration of request
+     * @param be2Tablets       BE to tablets {@link Map}
      * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE
-     * @param database database name of Doris table
-     * @param table table name of Doris table
-     * @param logger {@link Logger}
+     * @param database         database name of Doris table
+     * @param table            table name of Doris table
+     * @param logger           {@link Logger}
      * @return Doris RDD partition {@link List}
      * @throws IllegalArgumentException throw when translate failed
      */
     @VisibleForTesting
-    static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options,DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
-            String opaquedQueryPlan, String database, String table, Logger logger)
+    static List<PartitionDefinition> tabletsMapToPartition(DorisOptions options, DorisReadOptions readOptions, Map<String, List<Long>> be2Tablets,
+                                                           String opaquedQueryPlan, String database, String table, Logger logger)
             throws IllegalArgumentException {
         int tabletsSize = tabletCountLimitForOnePartition(readOptions, logger);
         List<PartitionDefinition> partitions = new ArrayList<>();
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
index 46c54c0..13bde01 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/SchemaUtils.java
@@ -27,10 +27,11 @@ public class SchemaUtils {
 
     /**
      * convert Doris return schema to inner schema struct.
+     *
      * @param tscanColumnDescs Doris BE return schema
      * @return inner schema struct
      */
-    public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs ){
+    public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
         Schema schema = new Schema(tscanColumnDescs.size());
         tscanColumnDescs.stream().forEach(desc -> schema.put(new Field(desc.getName(), desc.getType().name(), "", 0, 0)));
         return schema;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
index 4ac66be..9a58180 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Field.java
@@ -26,7 +26,8 @@ public class Field {
     private int precision;
     private int scale;
 
-    public Field() { }
+    public Field() {
+    }
 
     public Field(String name, String type, String comment, int precision, int scale) {
         this.name = name;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 8d81a37..00c699b 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -63,7 +63,7 @@ public class RowBatch {
             this.cols = new ArrayList<>(colCount);
         }
 
-        public  List<Object> getCols() {
+        public List<Object> getCols() {
             return cols;
         }
 
@@ -87,13 +87,13 @@ public class RowBatch {
         return rowBatch;
     }
 
-    public RowBatch(TScanBatchResult nextResult, Schema schema){
+    public RowBatch(TScanBatchResult nextResult, Schema schema) {
         this.schema = schema;
         this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
         this.arrowStreamReader = new ArrowStreamReader(
                 new ByteArrayInputStream(nextResult.getRows()),
                 rootAllocator
-                );
+        );
         this.offsetInRowBatch = 0;
     }
 
@@ -243,7 +243,7 @@ public class RowBatch {
                                 continue;
                             }
                             BigDecimal value = decimalVector.getObject(rowIndex).stripTrailingZeros();
-                            addValueToRow(rowIndex, DecimalData.fromBigDecimal(value,value.precision(),value.scale()));
+                            addValueToRow(rowIndex, DecimalData.fromBigDecimal(value, value.precision(), value.scale()));
                         }
                         break;
                     case "DATE":
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 33f5c85..77b53ba 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -44,17 +45,23 @@ import java.util.concurrent.TimeUnit;
 /**
  * DorisDynamicOutputFormat
  **/
-public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
+public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
-
-    private  DorisOptions options ;
-    private  DorisReadOptions readOptions;
-    private  DorisExecutionOptions executionOptions;
+    private static final String FIELD_DELIMITER_KEY = "column_separator";
+    private static final String FIELD_DELIMITER_DEFAULT = "\t";
+    private static final String LINE_DELIMITER_KEY = "line_delimiter";
+    private static final String LINE_DELIMITER_DEFAULT = "\n";
+    private static final String NULL_VALUE = "\\N";
+    private final String fieldDelimiter;
+    private final String lineDelimiter;
+
+    private DorisOptions options;
+    private DorisReadOptions readOptions;
+    private DorisExecutionOptions executionOptions;
     private DorisStreamLoad dorisStreamLoad;
-    private final String fieldDelimiter = "\t";
-    private final String lineDelimiter = "\n";
-    private final String NULL_VALUE = "\\N";
+
+
     private final List<String> batch = new ArrayList<>();
     private transient volatile boolean closed = false;
 
@@ -62,15 +69,16 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
     private transient ScheduledFuture<?> scheduledFuture;
     private transient volatile Exception flushException;
 
-    public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) {
+    public DorisDynamicOutputFormat(DorisOptions option, DorisReadOptions readOptions, DorisExecutionOptions executionOptions) {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
+        this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT);
+        this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT);
     }
 
     @Override
     public void configure(Configuration configuration) {
-
     }
 
     @Override
@@ -80,8 +88,9 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
                 options.getTableIdentifier().split("\\.")[0],
                 options.getTableIdentifier().split("\\.")[1],
                 options.getUsername(),
-                options.getPassword());
-        LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());
+                options.getPassword(),
+                executionOptions.getStreamLoadProp());
+        LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
 
         if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
             this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format"));
@@ -118,12 +127,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
     private void addBatch(RowData row) {
         StringJoiner value = new StringJoiner(this.fieldDelimiter);
         GenericRowData rowData = (GenericRowData) row;
-        for(int i = 0; i < row.getArity(); ++i) {
+        for (int i = 0; i < row.getArity(); ++i) {
             Object field = rowData.getField(i);
-            if(field != null){
+            if (field != null) {
                 value.add(field.toString());
-            }else{
-                value.add(this.NULL_VALUE);
+            } else {
+                value.add(NULL_VALUE);
             }
         }
         batch.add(value.toString());
@@ -151,12 +160,12 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
 
     public synchronized void flush() throws IOException {
         checkFlushException();
-        if(batch.isEmpty()){
+        if (batch.isEmpty()) {
             return;
         }
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
-                dorisStreamLoad.load(String.join(lineDelimiter,batch));
+                dorisStreamLoad.load(String.join(this.lineDelimiter, batch));
                 batch.clear();
                 break;
             } catch (StreamLoadException e) {
@@ -166,7 +175,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
                 }
                 try {
                     dorisStreamLoad.setHostPort(getBackend());
-                    LOG.warn("streamload error,switch be: {}",dorisStreamLoad.getLoadUrlStr(), e);
+                    LOG.warn("streamload error,switch be: {}", dorisStreamLoad.getLoadUrlStr(), e);
                     Thread.sleep(1000 * i);
                 } catch (InterruptedException ex) {
                     Thread.currentThread().interrupt();
@@ -177,10 +186,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
     }
 
 
-    private String getBackend() throws IOException{
+    private String getBackend() throws IOException {
         try {
             //get be url from fe
-           return  RestService.randomBackend(options,readOptions, LOG);
+            return RestService.randomBackend(options, readOptions, LOG);
         } catch (IOException | DorisException e) {
             LOG.error("get backends info fail");
             throw new IOException(e);
@@ -202,8 +211,8 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
      */
     public static class Builder {
         private DorisOptions.Builder optionsBuilder;
-        private  DorisReadOptions readOptions;
-        private  DorisExecutionOptions executionOptions;
+        private DorisReadOptions readOptions;
+        private DorisExecutionOptions executionOptions;
 
         public Builder() {
             this.optionsBuilder = DorisOptions.builder();
@@ -241,7 +250,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData>  {
 
         public DorisDynamicOutputFormat build() {
             return new DorisDynamicOutputFormat(
-                    optionsBuilder.build(),readOptions,executionOptions
+                    optionsBuilder.build(), readOptions, executionOptions
             );
         }
     }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 27b6f97..92d69e6 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -34,6 +34,8 @@ import org.apache.flink.table.utils.TableSchemaUtils;
 
 import java.time.Duration;
 import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT;
@@ -52,209 +54,224 @@ import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_
  * <p>Because the table source requires a decoding format, we are discovering the format using the
  * provided {@link FactoryUtil} for convenience.
  */
-public final class DorisDynamicTableFactory implements  DynamicTableSourceFactory , DynamicTableSinkFactory {
-
-	public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
-	public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
-	public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
-	public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
-
-	// doris options
-	private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
-			.key("doris.read.field")
-			.stringType()
-			.noDefaultValue()
-			.withDescription("List of column names in the Doris table, separated by commas");
-
-	private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
-			.key("doris.filter.query")
-			.stringType()
-			.noDefaultValue()
-			.withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
-
-	private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
-			.key("doris.request.tablet.size")
-			.intType()
-			.defaultValue(DORIS_TABLET_SIZE_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
-			.key("doris.request.connect.timeout.ms")
-			.intType()
-			.defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
-			.key("doris.request.read.timeout.ms")
-			.intType()
-			.defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
-			.key("doris.request.query.timeout.s")
-			.intType()
-			.defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
-			.key("doris.request.retries")
-			.intType()
-			.defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
-			.key("doris.deserialize.arrow.async")
-			.booleanType()
-			.defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
-			.key("doris.request.retriesdoris.deserialize.queue.size")
-			.intType()
-			.defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
-			.withDescription("");
-
-
-	private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
-			.key("doris.batch.size")
-			.intType()
-			.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
-			.withDescription("");
-
-	private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
-			.key("doris.exec.mem.limit")
-			.longType()
-			.defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
-			.withDescription("");
-
-	// flink write config options
-	private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
-			.key("sink.batch.size")
-			.intType()
-			.defaultValue(100)
-			.withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
-					" of records, will flush data. The default value is 100.");
-
-	private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
-			.key("sink.max-retries")
-			.intType()
-			.defaultValue(3)
-			.withDescription("the max retry times if writing records to database failed.");
-
-	private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
-			.key("sink.batch.interval")
-			.durationType()
-			.defaultValue(Duration.ofSeconds(1))
-			.withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
-					"default value is 1s.");
-
-
-	@Override
-	public String factoryIdentifier() {
-		return "doris"; // used for matching to `connector = '...'`
-	}
-
-	@Override
-	public Set<ConfigOption<?>> requiredOptions() {
-		final Set<ConfigOption<?>> options = new HashSet<>();
-		options.add(FENODES);
-		options.add(TABLE_IDENTIFIER);
-		return options;
-	}
-
-	@Override
-	public Set<ConfigOption<?>> optionalOptions() {
-		final Set<ConfigOption<?>> options = new HashSet<>();
-		options.add(FENODES);
-		options.add(TABLE_IDENTIFIER);
-		options.add(USERNAME);
-		options.add(PASSWORD);
-
-		options.add(DORIS_READ_FIELD);
-		options.add(DORIS_FILTER_QUERY);
-		options.add(DORIS_TABLET_SIZE);
-		options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
-		options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
-		options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
-		options.add(DORIS_REQUEST_RETRIES);
-		options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
-		options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
-		options.add(DORIS_BATCH_SIZE);
-		options.add(DORIS_EXEC_MEM_LIMIT);
-
-		options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
-		options.add(SINK_MAX_RETRIES);
-		options.add(SINK_BUFFER_FLUSH_INTERVAL);
-		return options;
-	}
-
-	@Override
-	public DynamicTableSource createDynamicTableSource(Context context) {
-		// either implement your custom validation logic here ...
-		// or use the provided helper utility
-		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-		// validate all options
-		helper.validate();
-		// get the validated options
-		final ReadableConfig options = helper.getOptions();
-		// derive the produced data type (excluding computed columns) from the catalog table
-		final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
-		TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-		// create and return dynamic table source
-		return new DorisDynamicTableSource(
-				getDorisOptions(helper.getOptions()),
-				getDorisReadOptions(helper.getOptions()),
-				physicalSchema);
-	}
-
-	private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
-		final String fenodes = readableConfig.get(FENODES);
-		final DorisOptions.Builder builder = DorisOptions.builder()
-				.setFenodes(fenodes)
-				.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
-
-		readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
-		readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
-		return builder.build();
-	}
-
-	private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
-		final DorisReadOptions.Builder builder = DorisReadOptions.builder();
-		builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
-				.setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
-				.setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
-				.setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
-				.setReadFields(readableConfig.get(DORIS_READ_FIELD))
-				.setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
-				.setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
-				.setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
-				.setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
-				.setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
-				.setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
-		return builder.build();
-	}
-
-	private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig) {
-		final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
-		builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
-		builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
-		builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
-		return builder.build();
-	}
-
-
-	@Override
-	public DynamicTableSink createDynamicTableSink(Context context) {
-		// either implement your custom validation logic here ...
-		// or use the provided helper utility
-		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
-		// validate all options
-		helper.validate();
-		// create and return dynamic table source
-		return new DorisDynamicTableSink(
-				getDorisOptions(helper.getOptions()),
-				getDorisReadOptions(helper.getOptions()),
-				getDorisExecutionOptions(helper.getOptions())
-		);
-	}
+public final class DorisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
+
+    public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
+    public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the jdbc table name.");
+    public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the jdbc user name.");
+    public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the jdbc password.");
+
+    // doris options
+    private static final ConfigOption<String> DORIS_READ_FIELD = ConfigOptions
+            .key("doris.read.field")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("List of column names in the Doris table, separated by commas");
+
+    private static final ConfigOption<String> DORIS_FILTER_QUERY = ConfigOptions
+            .key("doris.filter.query")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Filter expression of the query, which is transparently transmitted to Doris. Doris uses this expression to complete source-side data filtering");
+
+    private static final ConfigOption<Integer> DORIS_TABLET_SIZE = ConfigOptions
+            .key("doris.request.tablet.size")
+            .intType()
+            .defaultValue(DORIS_TABLET_SIZE_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Integer> DORIS_REQUEST_CONNECT_TIMEOUT_MS = ConfigOptions
+            .key("doris.request.connect.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Integer> DORIS_REQUEST_READ_TIMEOUT_MS = ConfigOptions
+            .key("doris.request.read.timeout.ms")
+            .intType()
+            .defaultValue(DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Integer> DORIS_REQUEST_QUERY_TIMEOUT_S = ConfigOptions
+            .key("doris.request.query.timeout.s")
+            .intType()
+            .defaultValue(DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Integer> DORIS_REQUEST_RETRIES = ConfigOptions
+            .key("doris.request.retries")
+            .intType()
+            .defaultValue(DORIS_REQUEST_RETRIES_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Boolean> DORIS_DESERIALIZE_ARROW_ASYNC = ConfigOptions
+            .key("doris.deserialize.arrow.async")
+            .booleanType()
+            .defaultValue(DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Integer> DORIS_DESERIALIZE_QUEUE_SIZE = ConfigOptions
+            .key("doris.request.retriesdoris.deserialize.queue.size")
+            .intType()
+            .defaultValue(DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT)
+            .withDescription("");
+
+
+    private static final ConfigOption<Integer> DORIS_BATCH_SIZE = ConfigOptions
+            .key("doris.batch.size")
+            .intType()
+            .defaultValue(DORIS_BATCH_SIZE_DEFAULT)
+            .withDescription("");
+
+    private static final ConfigOption<Long> DORIS_EXEC_MEM_LIMIT = ConfigOptions
+            .key("doris.exec.mem.limit")
+            .longType()
+            .defaultValue(DORIS_EXEC_MEM_LIMIT_DEFAULT)
+            .withDescription("");
+
+    // flink write config options
+    private static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS = ConfigOptions
+            .key("sink.batch.size")
+            .intType()
+            .defaultValue(100)
+            .withDescription("the flush max size (includes all append, upsert and delete records), over this number" +
+                    " of records, will flush data. The default value is 100.");
+
+    private static final ConfigOption<Integer> SINK_MAX_RETRIES = ConfigOptions
+            .key("sink.max-retries")
+            .intType()
+            .defaultValue(3)
+            .withDescription("the max retry times if writing records to database failed.");
+
+    private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions
+            .key("sink.batch.interval")
+            .durationType()
+            .defaultValue(Duration.ofSeconds(1))
+            .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " +
+                    "default value is 1s.");
+
+
+    // Prefix for Doris StreamLoad specific properties.
+    public static final String STREAM_LOAD_PROP_PREFIX = "sink.properties.";
+
+    @Override
+    public String factoryIdentifier() {
+        return "doris"; // used for matching to `connector = '...'`
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FENODES);
+        options.add(TABLE_IDENTIFIER);
+        return options;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        final Set<ConfigOption<?>> options = new HashSet<>();
+        options.add(FENODES);
+        options.add(TABLE_IDENTIFIER);
+        options.add(USERNAME);
+        options.add(PASSWORD);
+
+        options.add(DORIS_READ_FIELD);
+        options.add(DORIS_FILTER_QUERY);
+        options.add(DORIS_TABLET_SIZE);
+        options.add(DORIS_REQUEST_CONNECT_TIMEOUT_MS);
+        options.add(DORIS_REQUEST_READ_TIMEOUT_MS);
+        options.add(DORIS_REQUEST_QUERY_TIMEOUT_S);
+        options.add(DORIS_REQUEST_RETRIES);
+        options.add(DORIS_DESERIALIZE_ARROW_ASYNC);
+        options.add(DORIS_DESERIALIZE_QUEUE_SIZE);
+        options.add(DORIS_BATCH_SIZE);
+        options.add(DORIS_EXEC_MEM_LIMIT);
+
+        options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
+        options.add(SINK_MAX_RETRIES);
+        options.add(SINK_BUFFER_FLUSH_INTERVAL);
+        return options;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        // either implement your custom validation logic here ...
+        // or use the provided helper utility
+        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        // validate all options
+        helper.validate();
+        // get the validated options
+        final ReadableConfig options = helper.getOptions();
+        // derive the produced data type (excluding computed columns) from the catalog table
+        final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
+        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
+        // create and return dynamic table source
+        return new DorisDynamicTableSource(
+                getDorisOptions(helper.getOptions()),
+                getDorisReadOptions(helper.getOptions()),
+                physicalSchema);
+    }
+
+    private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
+        final String fenodes = readableConfig.get(FENODES);
+        final DorisOptions.Builder builder = DorisOptions.builder()
+                .setFenodes(fenodes)
+                .setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));
+
+        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
+        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
+        return builder.build();
+    }
+
+    private DorisReadOptions getDorisReadOptions(ReadableConfig readableConfig) {
+        final DorisReadOptions.Builder builder = DorisReadOptions.builder();
+        builder.setDeserializeArrowAsync(readableConfig.get(DORIS_DESERIALIZE_ARROW_ASYNC))
+                .setDeserializeQueueSize(readableConfig.get(DORIS_DESERIALIZE_QUEUE_SIZE))
+                .setExecMemLimit(readableConfig.get(DORIS_EXEC_MEM_LIMIT))
+                .setFilterQuery(readableConfig.get(DORIS_FILTER_QUERY))
+                .setReadFields(readableConfig.get(DORIS_READ_FIELD))
+                .setRequestQueryTimeoutS(readableConfig.get(DORIS_REQUEST_QUERY_TIMEOUT_S))
+                .setRequestBatchSize(readableConfig.get(DORIS_BATCH_SIZE))
+                .setRequestConnectTimeoutMs(readableConfig.get(DORIS_REQUEST_CONNECT_TIMEOUT_MS))
+                .setRequestReadTimeoutMs(readableConfig.get(DORIS_REQUEST_READ_TIMEOUT_MS))
+                .setRequestRetries(readableConfig.get(DORIS_REQUEST_RETRIES))
+                .setRequestTabletSize(readableConfig.get(DORIS_TABLET_SIZE));
+        return builder.build();
+    }
+
+    private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig readableConfig, Properties streamLoadProp) {
+        final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder();
+        builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
+        builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
+        builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+        builder.setStreamLoadProp(streamLoadProp);
+        return builder.build();
+    }
+
+    private Properties getStreamLoadProp(Map<String, String> tableOptions) {
+        final Properties streamLoadProp = new Properties();
+
+        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+            if (entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)) {
+                String subKey = entry.getKey().substring(STREAM_LOAD_PROP_PREFIX.length());
+                streamLoadProp.put(subKey, entry.getValue());
+            }
+        }
+        return streamLoadProp;
+    }
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
+        // validate all options
+        helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+
+        Properties streamLoadProp = getStreamLoadProp(context.getCatalogTable().getOptions());
+        // create and return dynamic table source
+        return new DorisDynamicTableSink(
+                getDorisOptions(helper.getOptions()),
+                getDorisReadOptions(helper.getOptions()),
+                getDorisExecutionOptions(helper.getOptions(), streamLoadProp)
+        );
+    }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index feeab96..dc710d7 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -63,7 +63,7 @@ public class DorisDynamicTableSink implements DynamicTableSink {
 
     @Override
     public DynamicTableSink copy() {
-        return new DorisDynamicTableSink(options,readOptions,executionOptions);
+        return new DorisDynamicTableSink(options, readOptions, executionOptions);
     }
 
     @Override
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
index ab523aa..43d9e5f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java
@@ -46,56 +46,56 @@ import java.util.List;
  * where we instantiate the required {@link SourceFunction} and its {@link DeserializationSchema} for
  * runtime. Both instances are parameterized to return internal data structures (i.e. {@link RowData}).
  */
-public final class DorisDynamicTableSource implements ScanTableSource ,LookupTableSource {
+public final class DorisDynamicTableSource implements ScanTableSource, LookupTableSource {
 
-	private final DorisOptions options;
-	private final  DorisReadOptions readOptions;
-	private TableSchema physicalSchema;
-	private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
+    private final DorisOptions options;
+    private final DorisReadOptions readOptions;
+    private TableSchema physicalSchema;
+    private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
 
-	public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions,TableSchema physicalSchema) {
-		 this.options = options;
-		 this.readOptions = readOptions;
-		 this.physicalSchema = physicalSchema;
-	}
+    public DorisDynamicTableSource(DorisOptions options, DorisReadOptions readOptions, TableSchema physicalSchema) {
+        this.options = options;
+        this.readOptions = readOptions;
+        this.physicalSchema = physicalSchema;
+    }
 
-	@Override
-	public ChangelogMode getChangelogMode() {
-		// in our example the format decides about the changelog mode
-		// but it could also be the source itself
-		return ChangelogMode.insertOnly();
-	}
+    @Override
+    public ChangelogMode getChangelogMode() {
+        // in our example the format decides about the changelog mode
+        // but it could also be the source itself
+        return ChangelogMode.insertOnly();
+    }
 
-	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
-		List<PartitionDefinition> dorisPartitions ;
-		try {
-			dorisPartitions = RestService.findPartitions(options,readOptions,LOG);
-		} catch (DorisException e) {
-			throw new RuntimeException("can not fetch partitions");
-		}
-		DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
-				.setFenodes(options.getFenodes())
-				.setUsername(options.getUsername())
-				.setPassword(options.getPassword())
-				.setTableIdentifier(options.getTableIdentifier())
-				.setPartitions(dorisPartitions)
-				.setReadOptions(readOptions);
-		return InputFormatProvider.of(builder.build());
-	}
+    @Override
+    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+        List<PartitionDefinition> dorisPartitions;
+        try {
+            dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
+        } catch (DorisException e) {
+            throw new RuntimeException("can not fetch partitions");
+        }
+        DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
+                .setFenodes(options.getFenodes())
+                .setUsername(options.getUsername())
+                .setPassword(options.getPassword())
+                .setTableIdentifier(options.getTableIdentifier())
+                .setPartitions(dorisPartitions)
+                .setReadOptions(readOptions);
+        return InputFormatProvider.of(builder.build());
+    }
 
-	@Override
-	public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
-		return null;
-	}
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
+        return null;
+    }
 
-	@Override
-	public DynamicTableSource copy() {
-		return new DorisDynamicTableSource(options,readOptions,physicalSchema);
-	}
+    @Override
+    public DynamicTableSource copy() {
+        return new DorisDynamicTableSource(options, readOptions, physicalSchema);
+    }
 
-	@Override
-	public String asSummaryString() {
-		return "Doris Table Source";
-	}
+    @Override
+    public String asSummaryString() {
+        return "Doris Table Source";
+    }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
index 75e7fc9..c75a88f 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java
@@ -45,183 +45,183 @@ import java.util.List;
 @Internal
 public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTableInputSplit> implements ResultTypeQueryable<RowData> {
 
-	private static final long serialVersionUID = 1L;
-	private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
-
-	private DorisOptions options;
-	private DorisReadOptions readOptions;
-	private List<PartitionDefinition>  dorisPartitions;
-	private TypeInformation<RowData> rowDataTypeInfo;
-
-	private ScalaValueReader scalaValueReader;
-	private transient boolean hasNext;
-
-	public DorisRowDataInputFormat(DorisOptions options,List<PartitionDefinition>  dorisPartitions,DorisReadOptions readOptions) {
-		this.options = options;
-		this.dorisPartitions = dorisPartitions;
-		this.readOptions = readOptions;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		//do nothing here
-	}
-
-	@Override
-	public void openInputFormat() {
-		//called once per inputFormat (on open)
-	}
-
-	@Override
-	public void closeInputFormat() {
-		//called once per inputFormat (on close)
-	}
-
-	/**
-	 * Connects to the source database and executes the query in a <b>parallel
-	 * fashion</b> if
-	 * this {@link InputFormat} is built using a parameterized query (i.e. using
-	 * a {@link PreparedStatement})
-	 * and a proper {@link  }, in a <b>non-parallel
-	 * fashion</b> otherwise.
-	 *
-	 * @param inputSplit which is ignored if this InputFormat is executed as a
-	 *                   non-parallel source,
-	 *                   a "hook" to the query parameters otherwise (using its
-	 *                   <i>splitNumber</i>)
-	 * @throws IOException if there's an error during the execution of the query
-	 */
-	@Override
-	public void open(DorisTableInputSplit inputSplit) throws IOException {
-		scalaValueReader = new ScalaValueReader(inputSplit.partition, options,readOptions);
-		hasNext = scalaValueReader.hasNext();
-	}
-
-	/**
-	 * Closes all resources used.
-	 *
-	 * @throws IOException Indicates that a resource could not be closed.
-	 */
-	@Override
-	public void close() throws IOException {
-
-	}
-
-	@Override
-	public TypeInformation<RowData> getProducedType() {
-		return rowDataTypeInfo;
-	}
-
-	/**
-	 * Checks whether all data has been read.
-	 *
-	 * @return boolean value indication whether all data has been read.
-	 * @throws IOException
-	 */
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !hasNext;
-	}
-
-	/**
-	 * Stores the next resultSet row in a tuple.
-	 *
-	 * @param reuse row to be reused.
-	 * @return row containing next {@link RowData}
-	 * @throws IOException
-	 */
-	@Override
-	public RowData nextRecord(RowData reuse) throws IOException {
-		if (!hasNext) {
-			return null;
-		}
-		List next = (List)scalaValueReader.next();
-		GenericRowData genericRowData = new GenericRowData(next.size());
-		for(int i =0;i<next.size();i++){
-			genericRowData.setField(i, next.get(i));
-		}
-		//update hasNext after we've read the record
-		hasNext = scalaValueReader.hasNext();
-		return genericRowData;
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return cachedStatistics;
-	}
-
-	@Override
-	public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
-		List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
-		int splitNum = 0;
-		for (PartitionDefinition partition : dorisPartitions) {
-			dorisSplits.add(new DorisTableInputSplit(splitNum++,partition));
-		}
-		LOG.info("DorisTableInputSplit Num:{}",dorisSplits.size());
-		return dorisSplits.toArray(new DorisTableInputSplit[0]);
-	}
-
-	@Override
-	public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
-		return new DefaultInputSplitAssigner(inputSplits);
-	}
-
-	/**
-	 * A builder used to set parameters to the output format's configuration in a fluent way.
-	 *
-	 * @return builder
-	 */
-	public static Builder builder() {
-		return new Builder();
-	}
-
-	/**
-	 * Builder for {@link DorisRowDataInputFormat}.
-	 */
-	public static class Builder {
-		private DorisOptions.Builder optionsBuilder;
-		private List<PartitionDefinition> partitions;
-		private DorisReadOptions readOptions;
-
-
-		public Builder() {
-			this.optionsBuilder = DorisOptions.builder();
-		}
-
-		public Builder setFenodes(String fenodes) {
-			this.optionsBuilder.setFenodes(fenodes);
-			return this;
-		}
-
-		public Builder setUsername(String username) {
-			this.optionsBuilder.setUsername(username);
-			return this;
-		}
-
-		public Builder setPassword(String password) {
-			this.optionsBuilder.setPassword(password);
-			return this;
-		}
-
-		public Builder setTableIdentifier(String tableIdentifier) {
-			this.optionsBuilder.setTableIdentifier(tableIdentifier);
-			return this;
-		}
-
-		public Builder setPartitions(List<PartitionDefinition> partitions) {
-			this.partitions = partitions;
-			return this;
-		}
-
-		public Builder setReadOptions(DorisReadOptions readOptions) {
-			this.readOptions = readOptions;
-			return this;
-		}
-
-		public DorisRowDataInputFormat build() {
-			return new DorisRowDataInputFormat(
-					optionsBuilder.build(),partitions,readOptions
-				 );
-		}
-	}
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = LoggerFactory.getLogger(DorisRowDataInputFormat.class);
+
+    private DorisOptions options;
+    private DorisReadOptions readOptions;
+    private List<PartitionDefinition> dorisPartitions;
+    private TypeInformation<RowData> rowDataTypeInfo;
+
+    private ScalaValueReader scalaValueReader;
+    private transient boolean hasNext;
+
+    public DorisRowDataInputFormat(DorisOptions options, List<PartitionDefinition> dorisPartitions, DorisReadOptions readOptions) {
+        this.options = options;
+        this.dorisPartitions = dorisPartitions;
+        this.readOptions = readOptions;
+    }
+
+    @Override
+    public void configure(Configuration parameters) {
+        //do nothing here
+    }
+
+    @Override
+    public void openInputFormat() {
+        //called once per inputFormat (on open)
+    }
+
+    @Override
+    public void closeInputFormat() {
+        //called once per inputFormat (on close)
+    }
+
+    /**
+     * Connects to the source database and executes the query in a <b>parallel
+     * fashion</b> if
+     * this {@link InputFormat} is built using a parameterized query (i.e. using
+     * a {@link PreparedStatement})
+     * and a proper {@link  }, in a <b>non-parallel
+     * fashion</b> otherwise.
+     *
+     * @param inputSplit which is ignored if this InputFormat is executed as a
+     *                   non-parallel source,
+     *                   a "hook" to the query parameters otherwise (using its
+     *                   <i>splitNumber</i>)
+     * @throws IOException if there's an error during the execution of the query
+     */
+    @Override
+    public void open(DorisTableInputSplit inputSplit) throws IOException {
+        scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions);
+        hasNext = scalaValueReader.hasNext();
+    }
+
+    /**
+     * Closes all resources used.
+     *
+     * @throws IOException Indicates that a resource could not be closed.
+     */
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public TypeInformation<RowData> getProducedType() {
+        return rowDataTypeInfo;
+    }
+
+    /**
+     * Checks whether all data has been read.
+     *
+     * @return boolean value indication whether all data has been read.
+     * @throws IOException
+     */
+    @Override
+    public boolean reachedEnd() throws IOException {
+        return !hasNext;
+    }
+
+    /**
+     * Stores the next resultSet row in a tuple.
+     *
+     * @param reuse row to be reused.
+     * @return row containing next {@link RowData}
+     * @throws IOException
+     */
+    @Override
+    public RowData nextRecord(RowData reuse) throws IOException {
+        if (!hasNext) {
+            return null;
+        }
+        List next = (List) scalaValueReader.next();
+        GenericRowData genericRowData = new GenericRowData(next.size());
+        for (int i = 0; i < next.size(); i++) {
+            genericRowData.setField(i, next.get(i));
+        }
+        //update hasNext after we've read the record
+        hasNext = scalaValueReader.hasNext();
+        return genericRowData;
+    }
+
+    @Override
+    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
+        return cachedStatistics;
+    }
+
+    @Override
+    public DorisTableInputSplit[] createInputSplits(int minNumSplits) throws IOException {
+        List<DorisTableInputSplit> dorisSplits = new ArrayList<>();
+        int splitNum = 0;
+        for (PartitionDefinition partition : dorisPartitions) {
+            dorisSplits.add(new DorisTableInputSplit(splitNum++, partition));
+        }
+        LOG.info("DorisTableInputSplit Num:{}", dorisSplits.size());
+        return dorisSplits.toArray(new DorisTableInputSplit[0]);
+    }
+
+    @Override
+    public InputSplitAssigner getInputSplitAssigner(DorisTableInputSplit[] inputSplits) {
+        return new DefaultInputSplitAssigner(inputSplits);
+    }
+
+    /**
+     * A builder used to set parameters to the output format's configuration in a fluent way.
+     *
+     * @return builder
+     */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link DorisRowDataInputFormat}.
+     */
+    public static class Builder {
+        private DorisOptions.Builder optionsBuilder;
+        private List<PartitionDefinition> partitions;
+        private DorisReadOptions readOptions;
+
+
+        public Builder() {
+            this.optionsBuilder = DorisOptions.builder();
+        }
+
+        public Builder setFenodes(String fenodes) {
+            this.optionsBuilder.setFenodes(fenodes);
+            return this;
+        }
+
+        public Builder setUsername(String username) {
+            this.optionsBuilder.setUsername(username);
+            return this;
+        }
+
+        public Builder setPassword(String password) {
+            this.optionsBuilder.setPassword(password);
+            return this;
+        }
+
+        public Builder setTableIdentifier(String tableIdentifier) {
+            this.optionsBuilder.setTableIdentifier(tableIdentifier);
+            return this;
+        }
+
+        public Builder setPartitions(List<PartitionDefinition> partitions) {
+            this.partitions = partitions;
+            return this;
+        }
+
+        public Builder setReadOptions(DorisReadOptions readOptions) {
+            this.readOptions = readOptions;
+            return this;
+        }
+
+        public DorisRowDataInputFormat build() {
+            return new DorisRowDataInputFormat(
+                    optionsBuilder.build(), partitions, readOptions
+            );
+        }
+    }
 }
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index ef16f33..b9a7708 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -17,6 +17,7 @@
 package org.apache.doris.flink.table;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.models.RespContent;
 import org.slf4j.Logger;
@@ -31,17 +32,21 @@ import java.io.Serializable;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Calendar;
+import java.util.Date;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.UUID;
 
 /**
  * DorisStreamLoad
  **/
-public class DorisStreamLoad implements Serializable{
+public class DorisStreamLoad implements Serializable {
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -54,8 +59,9 @@ public class DorisStreamLoad implements Serializable{
     private String db;
     private String tbl;
     private String authEncoding;
+    private Properties streamLoadProp;
 
-    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
+    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd, Properties streamLoadProp) {
         this.hostPort = hostPort;
         this.db = db;
         this.tbl = tbl;
@@ -63,6 +69,7 @@ public class DorisStreamLoad implements Serializable{
         this.passwd = passwd;
         this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+        this.streamLoadProp = streamLoadProp;
     }
 
     public String getLoadUrlStr() {
@@ -89,6 +96,9 @@ public class DorisStreamLoad implements Serializable{
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
+        for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
+            conn.addRequestProperty(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
+        }
         conn.setDoOutput(true);
         conn.setDoInput(true);
         return conn;
@@ -104,6 +114,7 @@ public class DorisStreamLoad implements Serializable{
             this.respMsg = respMsg;
             this.respContent = respContent;
         }
+
         @Override
         public String toString() {
             StringBuilder sb = new StringBuilder();
@@ -116,14 +127,14 @@ public class DorisStreamLoad implements Serializable{
 
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
-        LOG.info("Streamload Response:{}",loadResponse);
-        if(loadResponse.status != 200){
+        LOG.info("Streamload Response:{}", loadResponse);
+        if (loadResponse.status != 200) {
             throw new StreamLoadException("stream load error: " + loadResponse.respContent);
-        }else{
+        } else {
             ObjectMapper obj = new ObjectMapper();
             try {
                 RespContent respContent = obj.readValue(loadResponse.respContent, RespContent.class);
-                if(!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())){
+                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                     throw new StreamLoadException("stream load error: " + respContent.getMessage());
                 }
             } catch (IOException e) {
@@ -133,11 +144,13 @@ public class DorisStreamLoad implements Serializable{
     }
 
     private LoadResponse loadBatch(String value) {
-        Calendar calendar = Calendar.getInstance();
-        String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
-                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
-                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
-                UUID.randomUUID().toString().replaceAll("-", ""));
+        String label = streamLoadProp.getProperty("label");
+        if (StringUtils.isBlank(label)) {
+            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd_HHmmss");
+            String formatDate = sdf.format(new Date());
+            label = String.format("flink_connector_%s_%s",formatDate,
+                    UUID.randomUUID().toString().replaceAll("-", ""));
+        }
 
         HttpURLConnection feConn = null;
         HttpURLConnection beConn = null;
diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
index f245dac..5e81cc1 100644
--- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
+++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisTableInputSplit.java
@@ -22,14 +22,16 @@ import org.apache.flink.core.io.InputSplit;
 /**
  * DorisTableInputSplit
  **/
-public class DorisTableInputSplit implements InputSplit, java.io.Serializable{
+public class DorisTableInputSplit implements InputSplit, java.io.Serializable {
 
-    /** The number of the split. */
+    /**
+     * The number of the split.
+     */
     private final int splitNumber;
 
     protected final PartitionDefinition partition;
 
-    public DorisTableInputSplit(int splitNumber,PartitionDefinition partition) {
+    public DorisTableInputSplit(int splitNumber, PartitionDefinition partition) {
         super();
         this.splitNumber = splitNumber;
         this.partition = partition;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org