You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2021/09/28 09:46:29 UTC

[incubator-doris] branch master updated: [Feature] support spark connector sink stream data to doris (#6761)

This is an automated email from the ASF dual-hosted git repository.

jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d47100  [Feature] support spark connector sink stream data to doris (#6761)
8d47100 is described below

commit 8d471007a62ffbd1d628620c4f0c44b8b6e94a8a
Author: chovy <zh...@163.com>
AuthorDate: Tue Sep 28 17:46:19 2021 +0800

    [Feature] support spark connector sink stream data to doris (#6761)
    
    * [Feature] support spark connector sink stream data to doris
    
    * [Doc] Add spark-connector batch/stream writing instructions
    
    * add license and remove meaningless blanks code
    
    Co-authored-by: wei.zhao <we...@aispeech.com>
---
 docs/en/extending-doris/spark-doris-connector.md   |  48 +++++++-
 .../zh-CN/extending-doris/spark-doris-connector.md |  53 ++++++++-
 extension/spark-doris-connector/pom.xml            |   6 +
 .../doris/spark/CachedDorisStreamLoadClient.java   |  63 +++++++++++
 .../org/apache/doris/spark/DorisStreamLoad.java    |  37 ++++++-
 .../java/org/apache/doris/spark/cfg/Settings.java  |  13 +++
 .../org/apache/doris/spark/cfg/SparkSettings.java  |   3 +-
 .../org/apache/doris/spark/rest/RestService.java   |   9 +-
 .../doris/spark/sql/DorisSourceProvider.scala      |  72 +++++-------
 .../apache/doris/spark/sql/DorisStreamWriter.scala | 122 +++++++++++++++++++++
 .../doris/spark/sql/TestStreamSinkDoris.scala      |  53 +++++++++
 11 files changed, 418 insertions(+), 61 deletions(-)

diff --git a/docs/en/extending-doris/spark-doris-connector.md b/docs/en/extending-doris/spark-doris-connector.md
index 4735574..909db2a 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -26,9 +26,10 @@ under the License.
 
 # Spark Doris Connector
 
-Spark Doris Connector can support reading data stored in Doris through Spark.
+Spark Doris Connector can support reading data stored in Doris and writing data to Doris through Spark.
 
-- The current version only supports reading data from `Doris`.
+- Support reading data from `Doris`.
+- Support `Spark DataFrame` batch/stream writing data to `Doris`
 - You can map the `Doris` table to` DataFrame` or `RDD`, it is recommended to use` DataFrame`.
 - Support the completion of data filtering on the `Doris` side to reduce the amount of data transmission.
 
@@ -57,8 +58,9 @@ sh build.sh 2 ## soark 2.x version, the default is 2.3.4
 After successful compilation, the file `doris-spark-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode, put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package.
 
 ## Example
+### Read
 
-### SQL
+#### SQL
 
 ```sql
 CREATE TEMPORARY VIEW spark_doris
@@ -73,7 +75,7 @@ OPTIONS(
 SELECT * FROM spark_doris;
 ```
 
-### DataFrame
+#### DataFrame
 
 ```scala
 val dorisSparkDF = spark.read.format("doris")
@@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
 dorisSparkDF.show(5)
 ```
 
-### RDD
+#### RDD
 
 ```scala
 import org.apache.doris.spark._
@@ -101,6 +103,42 @@ val dorisSparkRDD = sc.dorisRDD(
 
 dorisSparkRDD.collect()
 ```
+### Write
+#### DataFrame(batch/stream)
+```scala
+## batch sink
+val mockDataDF = List(
+  (3, "440403001005", "21.cn"),
+  (1, "4404030013005", "22.cn"),
+  (33, null, "23.cn")
+).toDF("id", "mi_code", "mi_name")
+mockDataDF.show(5)
+
+mockDataDF.write.format("doris")
+  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+  .option("user", "$YOUR_DORIS_USERNAME")
+  .option("password", "$YOUR_DORIS_PASSWORD")
+  .save()
+
+## stream sink(StructuredStreaming)
+val kafkaSource = spark.readStream
+  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
+  .option("startingOffsets", "latest")
+  .option("subscribe", "$YOUR_KAFKA_TOPICS")
+  .format("kafka")
+  .load()
+kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
+  .writeStream
+  .format("doris")
+  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
+  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+  .option("user", "$YOUR_DORIS_USERNAME")
+  .option("password", "$YOUR_DORIS_PASSWORD")
+  .start()
+  .awaitTermination()
+```
 
 ## Configuration
 
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md b/docs/zh-CN/extending-doris/spark-doris-connector.md
index b8eb53e..fa69f38 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -26,9 +26,10 @@ under the License.
 
 # Spark Doris Connector
 
-Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据。
+Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
 
-- 当前版本只支持从`Doris`中读取数据。
+- 支持从`Doris`中读取数据
+- 支持`Spark DataFrame`批量/流式 写入`Doris`
 - 可以将`Doris`表映射为`DataFrame`或者`RDD`,推荐使用`DataFrame`。
 - 支持在`Doris`端完成数据过滤,减少数据传输量。
 
@@ -57,8 +58,9 @@ sh build.sh 2  ## soark 2.x版本,默认是2.3.4
 编译成功后,会在 `output/` 目录下生成文件 `doris-spark-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。
 
 ## 使用示例
+### 读取
 
-### SQL
+#### SQL
 
 ```sql
 CREATE TEMPORARY VIEW spark_doris
@@ -73,7 +75,7 @@ OPTIONS(
 SELECT * FROM spark_doris;
 ```
 
-### DataFrame
+#### DataFrame
 
 ```scala
 val dorisSparkDF = spark.read.format("doris")
@@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
 dorisSparkDF.show(5)
 ```
 
-### RDD
+#### RDD
 
 ```scala
 import org.apache.doris.spark._
@@ -102,6 +104,47 @@ val dorisSparkRDD = sc.dorisRDD(
 dorisSparkRDD.collect()
 ```
 
+### 写入
+
+#### DataFrame(batch/stream)
+
+```scala
+## batch sink
+val mockDataDF = List(
+  (3, "440403001005", "21.cn"),
+  (1, "4404030013005", "22.cn"),
+  (33, null, "23.cn")
+).toDF("id", "mi_code", "mi_name")
+mockDataDF.show(5)
+
+mockDataDF.write.format("doris")
+  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+  .option("user", "$YOUR_DORIS_USERNAME")
+  .option("password", "$YOUR_DORIS_PASSWORD")
+  .save()
+
+## stream sink(StructuredStreaming)
+val kafkaSource = spark.readStream
+  .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
+  .option("startingOffsets", "latest")
+  .option("subscribe", "$YOUR_KAFKA_TOPICS")
+  .format("kafka")
+  .load()
+kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
+  .writeStream
+  .format("doris")
+  .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
+  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+  .option("user", "$YOUR_DORIS_USERNAME")
+  .option("password", "$YOUR_DORIS_PASSWORD")
+  .start()
+  .awaitTermination()
+```
+
+
+
 ## 配置
 
 ### 通用配置项
diff --git a/extension/spark-doris-connector/pom.xml b/extension/spark-doris-connector/pom.xml
index 5ba2c6e..e015f06 100644
--- a/extension/spark-doris-connector/pom.xml
+++ b/extension/spark-doris-connector/pom.xml
@@ -139,6 +139,12 @@
             </exclusions>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
new file mode 100644
index 0000000..01cada4
--- /dev/null
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.DorisException;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * a cached streamload client for each partition
+ */
+public class CachedDorisStreamLoadClient {
+    private static final long cacheExpireTimeout = 30 * 60;
+    private static LoadingCache<SparkSettings, DorisStreamLoad> dorisStreamLoadLoadingCache;
+
+    static {
+        dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
+                .expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
+                .removalListener(new RemovalListener<Object, Object>() {
+                    @Override
+                    public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
+                        //do nothing
+                    }
+                })
+                .build(
+                        new CacheLoader<SparkSettings, DorisStreamLoad>() {
+                            @Override
+                            public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
+                                DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
+                                return dorisStreamLoad;
+                            }
+                        }
+                );
+    }
+
+    public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
+        DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
+        return dorisStreamLoad;
+    }
+}
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 0de3746..dcf569f 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -17,7 +17,11 @@
 package org.apache.doris.spark;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.RestService;
 import org.apache.doris.spark.rest.models.RespContent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,12 +40,16 @@ import java.util.Arrays;
 import java.util.Base64;
 import java.util.Calendar;
 import java.util.List;
+import java.util.StringJoiner;
 import java.util.UUID;
 
 /**
  * DorisStreamLoad
  **/
 public class DorisStreamLoad implements Serializable{
+    public static final String FIELD_DELIMITER = "\t";
+    public static final String LINE_DELIMITER = "\n";
+    public static final String NULL_VALUE = "\\N";
 
     private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
 
@@ -65,6 +73,18 @@ public class DorisStreamLoad implements Serializable{
         this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
     }
 
+    public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
+        String hostPort = RestService.randomBackend(settings, LOG);
+        this.hostPort = hostPort;
+        String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
+        this.db = dbTable[0];
+        this.tbl = dbTable[1];
+        this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
+        this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
+        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+    }
+
     public String getLoadUrlStr() {
         return loadUrlStr;
     }
@@ -84,7 +104,6 @@ public class DorisStreamLoad implements Serializable{
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
-        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
         conn.setRequestProperty("Authorization", "Basic " + authEncoding);
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
@@ -114,6 +133,22 @@ public class DorisStreamLoad implements Serializable{
         }
     }
 
+    public void load(List<List<Object>> rows) throws StreamLoadException {
+        StringJoiner lines = new StringJoiner(LINE_DELIMITER);
+        for (List<Object> row : rows) {
+            StringJoiner line = new StringJoiner(FIELD_DELIMITER);
+            for (Object field : row) {
+                if (field == null) {
+                    line.add(NULL_VALUE);
+                } else {
+                    line.add(field.toString());
+                }
+            }
+            lines.add(line.toString());
+        }
+        load(lines.toString());
+    }
+
     public void load(String value) throws StreamLoadException {
         LoadResponse loadResponse = loadBatch(value);
         LOG.info("Streamload Response:{}",loadResponse);
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
index 4677de7..4e376a4 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
@@ -98,4 +98,17 @@ public abstract class Settings {
         Properties copy = asProperties();
         return IOUtils.propsToString(copy);
     }
+
+    @Override
+    public int hashCode() {
+        return asProperties().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+        return asProperties().equals(((Settings) obj).asProperties());
+    }
 }
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
index 6568485..39fcd75 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
@@ -24,9 +24,10 @@ import org.apache.spark.SparkConf;
 import com.google.common.base.Preconditions;
 
 import scala.Option;
+import scala.Serializable;
 import scala.Tuple2;
 
-public class SparkSettings extends Settings {
+public class SparkSettings extends Settings implements Serializable {
 
     private final SparkConf cfg;
 
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 10126e8..0c9b5c4 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -65,7 +65,6 @@ import org.apache.doris.spark.rest.models.BackendRow;
 import org.apache.doris.spark.rest.models.QueryPlan;
 import org.apache.doris.spark.rest.models.Schema;
 import org.apache.doris.spark.rest.models.Tablet;
-import org.apache.doris.spark.sql.DorisWriterOption;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
@@ -476,17 +475,13 @@ public class RestService implements Serializable {
 
     /**
      * choice a Doris BE node to request.
-     * @param options configuration of request
      * @param logger slf4j logger
      * @return the chosen one Doris BE node
      * @throws IllegalArgumentException BE nodes is illegal
      */
     @VisibleForTesting
-    public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException {
-        // set user auth
-        sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user());
-        sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password());
-        String feNodes = options.feHostPort();
+    public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
+        String feNodes = sparkSettings.getProperty(DORIS_FENODES);
         String feNode = randomEndpoint(feNodes, logger);
         String beUrl =   String.format("http://%s" + BACKENDS,feNode);
         HttpGet httpGet = new HttpGet(beUrl);
diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index d8f951b..65f5250 100644
--- a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,25 +17,24 @@
 
 package org.apache.doris.spark.sql
 
-import java.io.IOException
-import java.util.StringJoiner
-
-import org.apache.commons.collections.CollectionUtils
 import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.SparkSettings
-import org.apache.doris.spark.exception.DorisException
-import org.apache.doris.spark.rest.RestService
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider}
-import org.apache.spark.sql.types.StructType
-import org.json4s.jackson.Json
 
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
+import java.io.IOException
+import java.util
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters.mapAsJavaMapConverter
 import scala.util.control.Breaks
 
-private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with Logging {
+private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging {
   override def shortName(): String = "doris"
 
   override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
@@ -50,41 +49,29 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
                               mode: SaveMode, parameters: Map[String, String],
                               data: DataFrame): BaseRelation = {
 
-    val dorisWriterOption = DorisWriterOption(parameters)
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
-    // choose available be node
-    val choosedBeHost = RestService.randomBackend(sparkSettings, dorisWriterOption, log)
+    sparkSettings.merge(Utils.params(parameters, log).asJava)
     // init stream loader
-    val dorisStreamLoader = new DorisStreamLoad(choosedBeHost, dorisWriterOption.dbName, dorisWriterOption.tbName, dorisWriterOption.user, dorisWriterOption.password)
-    val fieldDelimiter: String = "\t"
-    val lineDelimiter: String = "\n"
-    val NULL_VALUE: String = "\\N"
+    val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
 
-    val maxRowCount = dorisWriterOption.maxRowCount
-    val maxRetryTimes = dorisWriterOption.maxRetryTimes
+    val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
+    val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
 
     data.rdd.foreachPartition(partition => {
-
-      val buffer = ListBuffer[String]()
+      val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
       partition.foreach(row => {
-        val value = new StringJoiner(fieldDelimiter)
-        // create one row string
+        val line: util.List[Object] = new util.ArrayList[Object]()
         for (i <- 0 until row.size) {
           val field = row.get(i)
-          if (field == null) {
-            value.add(NULL_VALUE)
-          } else {
-            value.add(field.toString)
-          }
+          line.add(field.asInstanceOf[AnyRef])
         }
-        // add one row string to buffer
-        buffer += value.toString
-        if (buffer.size > maxRowCount) {
+        rowsBuffer.add(line)
+        if (rowsBuffer.size > maxRowCount) {
           flush
         }
       })
       // flush buffer
-      if (buffer.nonEmpty) {
+      if (!rowsBuffer.isEmpty) {
         flush
       }
 
@@ -98,16 +85,16 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
 
           for (i <- 1 to maxRetryTimes) {
             try {
-              dorisStreamLoader.load(buffer.mkString(lineDelimiter))
-              buffer.clear()
+              dorisStreamLoader.load(rowsBuffer)
+              rowsBuffer.clear()
               loop.break()
             }
             catch {
               case e: Exception =>
                 try {
                   Thread.sleep(1000 * i)
-                  dorisStreamLoader.load(buffer.mkString(lineDelimiter))
-                  buffer.clear()
+                  dorisStreamLoader.load(rowsBuffer)
+                  rowsBuffer.clear()
                 } catch {
                   case ex: InterruptedException =>
                     Thread.currentThread.interrupt()
@@ -136,8 +123,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
     }
   }
 
-
-
-
-
+  override def createStreamWriter(queryId: String, structType: StructType, outputMode: OutputMode, dataSourceOptions: DataSourceOptions): StreamWriter = {
+    val sparkSettings = new SparkSettings(new SparkConf())
+    sparkSettings.merge(Utils.params(dataSourceOptions.asMap().toMap, log).asJava)
+    new DorisStreamWriter(sparkSettings)
+  }
 }
diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
new file mode 100644
index 0000000..60d2c78
--- /dev/null
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
+
+import java.io.IOException
+import java.util
+import scala.util.control.Breaks
+
+/**
+ * A [[StreamWriter]] for Apache Doris streaming writing.
+ *
+ * @param settings params for writing doris table
+ */
+class DorisStreamWriter(settings: SparkSettings) extends StreamWriter {
+  override def createWriterFactory(): DorisStreamWriterFactory = DorisStreamWriterFactory(settings)
+
+  override def commit(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {}
+
+  override def abort(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {}
+
+}
+
+/**
+ * A [[DataWriterFactory]] for Apache Doris streaming writing. Will be serialized and sent to executors to generate
+ * the per-task data writers.
+ *
+ * @param settings params for writing doris table
+ */
+case class DorisStreamWriterFactory(settings: SparkSettings) extends DataWriterFactory[Row] {
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+    new DorisStreamDataWriter(settings)
+  }
+}
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object DorisWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[DataWriter]] for Apache Doris streaming writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param settings params for writing doris table
+ */
+class DorisStreamDataWriter(settings: SparkSettings) extends DataWriter[Row] {
+  val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
+  val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
+  val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+  val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
+
+  override def write(row: Row): Unit = {
+    val line: util.List[Object] = new util.ArrayList[Object]()
+    for (i <- 0 until row.size) {
+      val field = row.get(i)
+      line.add(field.asInstanceOf[AnyRef])
+    }
+    if (!line.isEmpty) {
+      rowsBuffer.add(line)
+    }
+    if (rowsBuffer.size >= maxRowCount) {
+      // commit when buffer is full
+      commit()
+    }
+  }
+
+  override def commit(): WriterCommitMessage = {
+    // we don't commit request until rows-buffer received some rows
+    val loop = new Breaks
+    loop.breakable {
+      for (i <- 1 to maxRetryTimes) {
+        try {
+          if (!rowsBuffer.isEmpty) {
+            dorisStreamLoader.load(rowsBuffer)
+          }
+          rowsBuffer.clear()
+          loop.break()
+        }
+        catch {
+          case e: Exception =>
+            try {
+              Thread.sleep(1000 * i)
+              if (!rowsBuffer.isEmpty) {
+                dorisStreamLoader.load(rowsBuffer)
+              }
+              rowsBuffer.clear()
+            } catch {
+              case ex: InterruptedException =>
+                Thread.currentThread.interrupt()
+                throw new IOException("unable to flush; interrupted while doing another attempt", e)
+            }
+        }
+      }
+    }
+    DorisWriterCommitMessage
+  }
+
+  override def abort(): Unit = {
+  }
+}
\ No newline at end of file
diff --git a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
new file mode 100644
index 0000000..c62c9fb
--- /dev/null
+++ b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.spark.sql.SparkSession
+
+object TestStreamSinkDoris {
+  val kafkaServers = ""
+  val kafkaTopics = ""
+  val dorisFeNodes = "your_doris_host_port"
+  val dorisUser = "root"
+  val dorisPwd = ""
+  val dorisTable = "test.test_tbl"
+
+  def main(args: Array[String]): Unit = {
+    val sparkSession = SparkSession.builder()
+      .master("local")
+      .getOrCreate()
+
+    val dataFrame = sparkSession.readStream
+      .option("kafka.bootstrap.servers", kafkaServers)
+      .option("startingOffsets", "latest")
+      .option("subscribe", kafkaTopics)
+      .format("kafka")
+      .option("failOnDataLoss", false)
+      .load()
+
+    dataFrame.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)")
+      .writeStream
+      .format("doris")
+      .option("checkpointLocation", "/tmp/test")
+      .option("doris.table.identifier", dorisTable)
+      .option("doris.fenodes", dorisFeNodes)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .start().awaitTermination()
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org