You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ji...@apache.org on 2021/09/28 09:46:29 UTC
[incubator-doris] branch master updated: [Feature] support spark
connector sink stream data to doris (#6761)
This is an automated email from the ASF dual-hosted git repository.
jiafengzheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8d47100 [Feature] support spark connector sink stream data to doris (#6761)
8d47100 is described below
commit 8d471007a62ffbd1d628620c4f0c44b8b6e94a8a
Author: chovy <zh...@163.com>
AuthorDate: Tue Sep 28 17:46:19 2021 +0800
[Feature] support spark connector sink stream data to doris (#6761)
* [Feature] support spark connector sink stream data to doris
* [Doc] Add spark-connector batch/stream writing instructions
* add license and remove meaningless blanks code
Co-authored-by: wei.zhao <we...@aispeech.com>
---
docs/en/extending-doris/spark-doris-connector.md | 48 +++++++-
.../zh-CN/extending-doris/spark-doris-connector.md | 53 ++++++++-
extension/spark-doris-connector/pom.xml | 6 +
.../doris/spark/CachedDorisStreamLoadClient.java | 63 +++++++++++
.../org/apache/doris/spark/DorisStreamLoad.java | 37 ++++++-
.../java/org/apache/doris/spark/cfg/Settings.java | 13 +++
.../org/apache/doris/spark/cfg/SparkSettings.java | 3 +-
.../org/apache/doris/spark/rest/RestService.java | 9 +-
.../doris/spark/sql/DorisSourceProvider.scala | 72 +++++-------
.../apache/doris/spark/sql/DorisStreamWriter.scala | 122 +++++++++++++++++++++
.../doris/spark/sql/TestStreamSinkDoris.scala | 53 +++++++++
11 files changed, 418 insertions(+), 61 deletions(-)
diff --git a/docs/en/extending-doris/spark-doris-connector.md b/docs/en/extending-doris/spark-doris-connector.md
index 4735574..909db2a 100644
--- a/docs/en/extending-doris/spark-doris-connector.md
+++ b/docs/en/extending-doris/spark-doris-connector.md
@@ -26,9 +26,10 @@ under the License.
# Spark Doris Connector
-Spark Doris Connector can support reading data stored in Doris through Spark.
+Spark Doris Connector can support reading data stored in Doris and writing data to Doris through Spark.
-- The current version only supports reading data from `Doris`.
+- Support reading data from `Doris`.
+- Support `Spark DataFrame` batch/stream writing data to `Doris`
- You can map the `Doris` table to` DataFrame` or `RDD`, it is recommended to use` DataFrame`.
- Support the completion of data filtering on the `Doris` side to reduce the amount of data transmission.
@@ -57,8 +58,9 @@ sh build.sh 2 ## soark 2.x version, the default is 2.3.4
After successful compilation, the file `doris-spark-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Spark` to use `Spark-Doris-Connector`. For example, `Spark` running in `Local` mode, put this file in the `jars/` folder. `Spark` running in `Yarn` cluster mode, put this file in the pre-deployment package.
## Example
+### Read
-### SQL
+#### SQL
```sql
CREATE TEMPORARY VIEW spark_doris
@@ -73,7 +75,7 @@ OPTIONS(
SELECT * FROM spark_doris;
```
-### DataFrame
+#### DataFrame
```scala
val dorisSparkDF = spark.read.format("doris")
@@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
dorisSparkDF.show(5)
```
-### RDD
+#### RDD
```scala
import org.apache.doris.spark._
@@ -101,6 +103,42 @@ val dorisSparkRDD = sc.dorisRDD(
dorisSparkRDD.collect()
```
+### Write
+#### DataFrame(batch/stream)
+```scala
+## batch sink
+val mockDataDF = List(
+ (3, "440403001005", "21.cn"),
+ (1, "4404030013005", "22.cn"),
+ (33, null, "23.cn")
+).toDF("id", "mi_code", "mi_name")
+mockDataDF.show(5)
+
+mockDataDF.write.format("doris")
+ .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+ .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+ .option("user", "$YOUR_DORIS_USERNAME")
+ .option("password", "$YOUR_DORIS_PASSWORD")
+ .save()
+
+## stream sink(StructuredStreaming)
+val kafkaSource = spark.readStream
+ .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
+ .option("startingOffsets", "latest")
+ .option("subscribe", "$YOUR_KAFKA_TOPICS")
+ .format("kafka")
+ .load()
+kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
+ .writeStream
+ .format("doris")
+ .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
+ .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+ .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+ .option("user", "$YOUR_DORIS_USERNAME")
+ .option("password", "$YOUR_DORIS_PASSWORD")
+ .start()
+ .awaitTermination()
+```
## Configuration
diff --git a/docs/zh-CN/extending-doris/spark-doris-connector.md b/docs/zh-CN/extending-doris/spark-doris-connector.md
index b8eb53e..fa69f38 100644
--- a/docs/zh-CN/extending-doris/spark-doris-connector.md
+++ b/docs/zh-CN/extending-doris/spark-doris-connector.md
@@ -26,9 +26,10 @@ under the License.
# Spark Doris Connector
-Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据。
+Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
-- 当前版本只支持从`Doris`中读取数据。
+- 支持从`Doris`中读取数据
+- 支持`Spark DataFrame`批量/流式 写入`Doris`
- 可以将`Doris`表映射为`DataFrame`或者`RDD`,推荐使用`DataFrame`。
- 支持在`Doris`端完成数据过滤,减少数据传输量。
@@ -57,8 +58,9 @@ sh build.sh 2 ## soark 2.x版本,默认是2.3.4
编译成功后,会在 `output/` 目录下生成文件 `doris-spark-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。
## 使用示例
+### 读取
-### SQL
+#### SQL
```sql
CREATE TEMPORARY VIEW spark_doris
@@ -73,7 +75,7 @@ OPTIONS(
SELECT * FROM spark_doris;
```
-### DataFrame
+#### DataFrame
```scala
val dorisSparkDF = spark.read.format("doris")
@@ -86,7 +88,7 @@ val dorisSparkDF = spark.read.format("doris")
dorisSparkDF.show(5)
```
-### RDD
+#### RDD
```scala
import org.apache.doris.spark._
@@ -102,6 +104,47 @@ val dorisSparkRDD = sc.dorisRDD(
dorisSparkRDD.collect()
```
+### 写入
+
+#### DataFrame(batch/stream)
+
+```scala
+## batch sink
+val mockDataDF = List(
+ (3, "440403001005", "21.cn"),
+ (1, "4404030013005", "22.cn"),
+ (33, null, "23.cn")
+).toDF("id", "mi_code", "mi_name")
+mockDataDF.show(5)
+
+mockDataDF.write.format("doris")
+ .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+ .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+ .option("user", "$YOUR_DORIS_USERNAME")
+ .option("password", "$YOUR_DORIS_PASSWORD")
+ .save()
+
+## stream sink(StructuredStreaming)
+val kafkaSource = spark.readStream
+ .option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
+ .option("startingOffsets", "latest")
+ .option("subscribe", "$YOUR_KAFKA_TOPICS")
+ .format("kafka")
+ .load()
+kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
+ .writeStream
+ .format("doris")
+ .option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
+ .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
+ .option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+ .option("user", "$YOUR_DORIS_USERNAME")
+ .option("password", "$YOUR_DORIS_PASSWORD")
+ .start()
+ .awaitTermination()
+```
+
+
+
## 配置
### 通用配置项
diff --git a/extension/spark-doris-connector/pom.xml b/extension/spark-doris-connector/pom.xml
index 5ba2c6e..e015f06 100644
--- a/extension/spark-doris-connector/pom.xml
+++ b/extension/spark-doris-connector/pom.xml
@@ -139,6 +139,12 @@
</exclusions>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
new file mode 100644
index 0000000..01cada4
--- /dev/null
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.DorisException;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * a cached streamload client for each partition
+ */
+public class CachedDorisStreamLoadClient {
+ private static final long cacheExpireTimeout = 30 * 60;
+ private static LoadingCache<SparkSettings, DorisStreamLoad> dorisStreamLoadLoadingCache;
+
+ static {
+ dorisStreamLoadLoadingCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(cacheExpireTimeout, TimeUnit.SECONDS)
+ .removalListener(new RemovalListener<Object, Object>() {
+ @Override
+ public void onRemoval(RemovalNotification<Object, Object> removalNotification) {
+ //do nothing
+ }
+ })
+ .build(
+ new CacheLoader<SparkSettings, DorisStreamLoad>() {
+ @Override
+ public DorisStreamLoad load(SparkSettings sparkSettings) throws IOException, DorisException {
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(sparkSettings);
+ return dorisStreamLoad;
+ }
+ }
+ );
+ }
+
+ public static DorisStreamLoad getOrCreate(SparkSettings settings) throws ExecutionException {
+ DorisStreamLoad dorisStreamLoad = dorisStreamLoadLoadingCache.get(settings);
+ return dorisStreamLoad;
+ }
+}
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index 0de3746..dcf569f 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -17,7 +17,11 @@
package org.apache.doris.spark;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.SparkSettings;
+import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.StreamLoadException;
+import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.RespContent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,12 +40,16 @@ import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
+import java.util.StringJoiner;
import java.util.UUID;
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable{
+ public static final String FIELD_DELIMITER = "\t";
+ public static final String LINE_DELIMITER = "\n";
+ public static final String NULL_VALUE = "\\N";
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
@@ -65,6 +73,18 @@ public class DorisStreamLoad implements Serializable{
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
}
+ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
+ String hostPort = RestService.randomBackend(settings, LOG);
+ this.hostPort = hostPort;
+ String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
+ this.db = dbTable[0];
+ this.tbl = dbTable[1];
+ this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
+ this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
+ this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ }
+
public String getLoadUrlStr() {
return loadUrlStr;
}
@@ -84,7 +104,6 @@ public class DorisStreamLoad implements Serializable{
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
- String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
@@ -114,6 +133,22 @@ public class DorisStreamLoad implements Serializable{
}
}
+ public void load(List<List<Object>> rows) throws StreamLoadException {
+ StringJoiner lines = new StringJoiner(LINE_DELIMITER);
+ for (List<Object> row : rows) {
+ StringJoiner line = new StringJoiner(FIELD_DELIMITER);
+ for (Object field : row) {
+ if (field == null) {
+ line.add(NULL_VALUE);
+ } else {
+ line.add(field.toString());
+ }
+ }
+ lines.add(line.toString());
+ }
+ load(lines.toString());
+ }
+
public void load(String value) throws StreamLoadException {
LoadResponse loadResponse = loadBatch(value);
LOG.info("Streamload Response:{}",loadResponse);
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
index 4677de7..4e376a4 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
@@ -98,4 +98,17 @@ public abstract class Settings {
Properties copy = asProperties();
return IOUtils.propsToString(copy);
}
+
+ @Override
+ public int hashCode() {
+ return asProperties().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ return asProperties().equals(((Settings) obj).asProperties());
+ }
}
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
index 6568485..39fcd75 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
@@ -24,9 +24,10 @@ import org.apache.spark.SparkConf;
import com.google.common.base.Preconditions;
import scala.Option;
+import scala.Serializable;
import scala.Tuple2;
-public class SparkSettings extends Settings {
+public class SparkSettings extends Settings implements Serializable {
private final SparkConf cfg;
diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 10126e8..0c9b5c4 100644
--- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -65,7 +65,6 @@ import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
-import org.apache.doris.spark.sql.DorisWriterOption;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
@@ -476,17 +475,13 @@ public class RestService implements Serializable {
/**
* choice a Doris BE node to request.
- * @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
- public static String randomBackend(SparkSettings sparkSettings , DorisWriterOption options , Logger logger) throws DorisException, IOException {
- // set user auth
- sparkSettings.setProperty(DORIS_REQUEST_AUTH_USER,options.user());
- sparkSettings.setProperty(DORIS_REQUEST_AUTH_PASSWORD,options.password());
- String feNodes = options.feHostPort();
+ public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
+ String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS,feNode);
HttpGet httpGet = new HttpGet(beUrl);
diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index d8f951b..65f5250 100644
--- a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -17,25 +17,24 @@
package org.apache.doris.spark.sql
-import java.io.IOException
-import java.util.StringJoiner
-
-import org.apache.commons.collections.CollectionUtils
import org.apache.doris.spark.DorisStreamLoad
-import org.apache.doris.spark.cfg.SparkSettings
-import org.apache.doris.spark.exception.DorisException
-import org.apache.doris.spark.rest.RestService
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister, Filter, RelationProvider}
-import org.apache.spark.sql.types.StructType
-import org.json4s.jackson.Json
-import scala.collection.mutable.ListBuffer
-import scala.util.Random
+import java.io.IOException
+import java.util
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.util.control.Breaks
-private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with Logging {
+private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider with CreatableRelationProvider with StreamWriteSupport with Logging {
override def shortName(): String = "doris"
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
@@ -50,41 +49,29 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
mode: SaveMode, parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- val dorisWriterOption = DorisWriterOption(parameters)
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
- // choose available be node
- val choosedBeHost = RestService.randomBackend(sparkSettings, dorisWriterOption, log)
+ sparkSettings.merge(Utils.params(parameters, log).asJava)
// init stream loader
- val dorisStreamLoader = new DorisStreamLoad(choosedBeHost, dorisWriterOption.dbName, dorisWriterOption.tbName, dorisWriterOption.user, dorisWriterOption.password)
- val fieldDelimiter: String = "\t"
- val lineDelimiter: String = "\n"
- val NULL_VALUE: String = "\\N"
+ val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
- val maxRowCount = dorisWriterOption.maxRowCount
- val maxRetryTimes = dorisWriterOption.maxRetryTimes
+ val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
+ val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
data.rdd.foreachPartition(partition => {
-
- val buffer = ListBuffer[String]()
+ val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
partition.foreach(row => {
- val value = new StringJoiner(fieldDelimiter)
- // create one row string
+ val line: util.List[Object] = new util.ArrayList[Object]()
for (i <- 0 until row.size) {
val field = row.get(i)
- if (field == null) {
- value.add(NULL_VALUE)
- } else {
- value.add(field.toString)
- }
+ line.add(field.asInstanceOf[AnyRef])
}
- // add one row string to buffer
- buffer += value.toString
- if (buffer.size > maxRowCount) {
+ rowsBuffer.add(line)
+ if (rowsBuffer.size > maxRowCount) {
flush
}
})
// flush buffer
- if (buffer.nonEmpty) {
+ if (!rowsBuffer.isEmpty) {
flush
}
@@ -98,16 +85,16 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
for (i <- 1 to maxRetryTimes) {
try {
- dorisStreamLoader.load(buffer.mkString(lineDelimiter))
- buffer.clear()
+ dorisStreamLoader.load(rowsBuffer)
+ rowsBuffer.clear()
loop.break()
}
catch {
case e: Exception =>
try {
Thread.sleep(1000 * i)
- dorisStreamLoader.load(buffer.mkString(lineDelimiter))
- buffer.clear()
+ dorisStreamLoader.load(rowsBuffer)
+ rowsBuffer.clear()
} catch {
case ex: InterruptedException =>
Thread.currentThread.interrupt()
@@ -136,8 +123,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister with RelationP
}
}
-
-
-
-
+ override def createStreamWriter(queryId: String, structType: StructType, outputMode: OutputMode, dataSourceOptions: DataSourceOptions): StreamWriter = {
+ val sparkSettings = new SparkSettings(new SparkConf())
+ sparkSettings.merge(Utils.params(dataSourceOptions.asMap().toMap, log).asJava)
+ new DorisStreamWriter(sparkSettings)
+ }
}
diff --git a/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
new file mode 100644
index 0000000..60d2c78
--- /dev/null
+++ b/extension/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamWriter.scala
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
+import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
+
+import java.io.IOException
+import java.util
+import scala.util.control.Breaks
+
+/**
+ * A [[StreamWriter]] for Apache Doris streaming writing.
+ *
+ * @param settings params for writing doris table
+ */
+class DorisStreamWriter(settings: SparkSettings) extends StreamWriter {
+ override def createWriterFactory(): DorisStreamWriterFactory = DorisStreamWriterFactory(settings)
+
+ override def commit(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {}
+
+ override def abort(l: Long, writerCommitMessages: Array[WriterCommitMessage]): Unit = {}
+
+}
+
+/**
+ * A [[DataWriterFactory]] for Apache Doris streaming writing. Will be serialized and sent to executors to generate
+ * the per-task data writers.
+ *
+ * @param settings params for writing doris table
+ */
+case class DorisStreamWriterFactory(settings: SparkSettings) extends DataWriterFactory[Row] {
+ override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+ new DorisStreamDataWriter(settings)
+ }
+}
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we
+ * don't need to really send one.
+ */
+case object DorisWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[DataWriter]] for Apache Doris streaming writing. One data writer will be created in each partition to
+ * process incoming rows.
+ *
+ * @param settings params for writing doris table
+ */
+class DorisStreamDataWriter(settings: SparkSettings) extends DataWriter[Row] {
+ val maxRowCount: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_BATCH_SIZE, ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT)
+ val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT)
+ val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)
+ val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount)
+
+ override def write(row: Row): Unit = {
+ val line: util.List[Object] = new util.ArrayList[Object]()
+ for (i <- 0 until row.size) {
+ val field = row.get(i)
+ line.add(field.asInstanceOf[AnyRef])
+ }
+ if (!line.isEmpty) {
+ rowsBuffer.add(line)
+ }
+ if (rowsBuffer.size >= maxRowCount) {
+ // commit when buffer is full
+ commit()
+ }
+ }
+
+ override def commit(): WriterCommitMessage = {
+ // we don't commit request until rows-buffer received some rows
+ val loop = new Breaks
+ loop.breakable {
+ for (i <- 1 to maxRetryTimes) {
+ try {
+ if (!rowsBuffer.isEmpty) {
+ dorisStreamLoader.load(rowsBuffer)
+ }
+ rowsBuffer.clear()
+ loop.break()
+ }
+ catch {
+ case e: Exception =>
+ try {
+ Thread.sleep(1000 * i)
+ if (!rowsBuffer.isEmpty) {
+ dorisStreamLoader.load(rowsBuffer)
+ }
+ rowsBuffer.clear()
+ } catch {
+ case ex: InterruptedException =>
+ Thread.currentThread.interrupt()
+ throw new IOException("unable to flush; interrupted while doing another attempt", e)
+ }
+ }
+ }
+ }
+ DorisWriterCommitMessage
+ }
+
+ override def abort(): Unit = {
+ }
+}
\ No newline at end of file
diff --git a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
new file mode 100644
index 0000000..c62c9fb
--- /dev/null
+++ b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestStreamSinkDoris.scala
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.sql
+
+import org.apache.spark.sql.SparkSession
+
+object TestStreamSinkDoris {
+ val kafkaServers = ""
+ val kafkaTopics = ""
+ val dorisFeNodes = "your_doris_host_port"
+ val dorisUser = "root"
+ val dorisPwd = ""
+ val dorisTable = "test.test_tbl"
+
+ def main(args: Array[String]): Unit = {
+ val sparkSession = SparkSession.builder()
+ .master("local")
+ .getOrCreate()
+
+ val dataFrame = sparkSession.readStream
+ .option("kafka.bootstrap.servers", kafkaServers)
+ .option("startingOffsets", "latest")
+ .option("subscribe", kafkaTopics)
+ .format("kafka")
+ .option("failOnDataLoss", false)
+ .load()
+
+ dataFrame.selectExpr("CAST(timestamp AS STRING)", "CAST(partition as STRING)")
+ .writeStream
+ .format("doris")
+ .option("checkpointLocation", "/tmp/test")
+ .option("doris.table.identifier", dorisTable)
+ .option("doris.fenodes", dorisFeNodes)
+ .option("user", dorisUser)
+ .option("password", dorisPwd)
+ .start().awaitTermination()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org