You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/04/14 01:48:50 UTC
[incubator-doris-spark-connector] branch master updated: [improvement] stream load data is converted to json format (#15)
This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new b80046d [improvement] stream load data is converted to json format (#15)
b80046d is described below
commit b80046d55682b7760d8efbf8b66337cceb369ed8
Author: smallhibiscus <84...@qq.com>
AuthorDate: Thu Apr 14 09:48:46 2022 +0800
[improvement] stream load data is converted to json format (#15)
* [improvement] stream load data is converted to json format
* Add unit test and Schema.java add keysType property
* modify doris read kafka only with jsonobject format
* format code
Co-authored-by: smallhibiscus <844981280>
---
.../org/apache/doris/spark/DorisStreamLoad.java | 49 ++++++++++--
.../org/apache/doris/spark/rest/models/Schema.java | 10 +++
.../doris/spark/sql/DorisSourceProvider.scala | 8 +-
.../doris/spark/sql/DorisStreamLoadSink.scala | 24 +++---
.../doris/spark/sql/TestConnectorWriteDoris.scala | 89 ++++++++++++++++++++++
5 files changed, 157 insertions(+), 23 deletions(-)
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index b74e0a8..db3ef7d 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -16,6 +16,7 @@
// under the License.
package org.apache.doris.spark;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.SparkSettings;
@@ -35,13 +36,16 @@ import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Base64;
-import java.util.Calendar;
import java.util.List;
import java.util.StringJoiner;
import java.util.UUID;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Calendar;
+
/**
* DorisStreamLoad
@@ -63,6 +67,7 @@ public class DorisStreamLoad implements Serializable{
private String tbl;
private String authEncoding;
private String columns;
+ private String[] dfColumns;
public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
this.hostPort = hostPort;
@@ -87,6 +92,20 @@ public class DorisStreamLoad implements Serializable{
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
}
+ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
+ String hostPort = RestService.randomBackendV2(settings, LOG);
+ this.hostPort = hostPort;
+ String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
+ this.db = dbTable[0];
+ this.tbl = dbTable[1];
+ this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
+ this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
+ this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
+ this.dfColumns = dfColumns;
+ }
+
public String getLoadUrlStr() {
return loadUrlStr;
}
@@ -115,6 +134,8 @@ public class DorisStreamLoad implements Serializable{
}
conn.setDoOutput(true);
conn.setDoInput(true);
+ conn.addRequestProperty("format", "json");
+ conn.addRequestProperty("strip_outer_array", "true");
return conn;
}
@@ -138,7 +159,7 @@ public class DorisStreamLoad implements Serializable{
}
}
- public String listToString(List<List<Object>> rows){
+ public String listToString(List<List<Object>> rows) {
StringJoiner lines = new StringJoiner(LINE_DELIMITER);
for (List<Object> row : rows) {
StringJoiner line = new StringJoiner(FIELD_DELIMITER);
@@ -155,10 +176,24 @@ public class DorisStreamLoad implements Serializable{
}
- public void load(List<List<Object>> rows) throws StreamLoadException {
- String records = listToString(rows);
- load(records);
+ public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
+ List<Map<Object,Object>> dataList = new ArrayList<>();
+ try {
+ for (List<Object> row : rows) {
+ Map<Object,Object> dataMap = new HashMap<>();
+ if (dfColumns.length == row.size()) {
+ for (int i = 0; i < dfColumns.length; i++) {
+ dataMap.put(dfColumns[i], row.get(i));
+ }
+ }
+ dataList.add(dataMap);
+ }
+ } catch (Exception e) {
+ throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
+ }
+ load((new ObjectMapper()).writeValueAsString(dataList));
}
+
public void load(String value) throws StreamLoadException {
LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
LoadResponse loadResponse = loadBatch(value);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
index 0a437bf..a236325 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
@@ -23,6 +23,7 @@ import java.util.Objects;
public class Schema {
private int status = 0;
+ private String keysType;
private List<Field> properties;
private String keysType;
@@ -42,6 +43,14 @@ public class Schema {
this.status = status;
}
+ public String getKeysType() {
+ return keysType;
+ }
+
+ public void setKeysType(String keysType) {
+ this.keysType = keysType;
+ }
+
public List<Field> getProperties() {
return properties;
}
@@ -99,6 +108,7 @@ public class Schema {
public String toString() {
return "Schema{" +
"status=" + status +
+ ", keysType='" + keysType +
", properties=" + properties +
'}';
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 9b7d3f0..58c974c 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -60,7 +60,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
sparkSettings.merge(Utils.params(parameters, logger).asJava)
// init stream loader
- val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
+ val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
@@ -93,7 +93,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
for (i <- 1 to maxRetryTimes) {
try {
- dorisStreamLoader.load(rowsBuffer)
+ dorisStreamLoader.loadV2(rowsBuffer)
rowsBuffer.clear()
loop.break()
}
@@ -102,7 +102,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
- dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger))
+ dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
@@ -113,7 +113,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
}
}
- if(!rowsBuffer.isEmpty){
+ if (!rowsBuffer.isEmpty) {
logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 6e73698..a1fcbf8 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,6 +17,7 @@
package org.apache.doris.spark.sql
+import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
import org.apache.spark.sql.execution.QueryExecution
@@ -25,7 +26,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.slf4j.{Logger, LoggerFactory}
import java.io.IOException
import java.util
-
import org.apache.doris.spark.rest.RestService
import scala.util.control.Breaks
@@ -49,20 +49,20 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
def write(queryExecution: QueryExecution): Unit = {
queryExecution.toRdd.foreachPartition(iter => {
- val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]()
+ val objectMapper = new ObjectMapper()
+ val arrayNode = objectMapper.createArrayNode()
iter.foreach(row => {
val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
for (i <- 0 until row.numFields) {
val field = row.copy().getUTF8String(i)
- line.add(field.asInstanceOf[AnyRef])
+ arrayNode.add(objectMapper.readTree(field.toString))
}
- rowsBuffer.add(line)
- if (rowsBuffer.size > maxRowCount - 1) {
+ if (arrayNode.size > maxRowCount - 1) {
flush
}
})
// flush buffer
- if (!rowsBuffer.isEmpty) {
+ if (!arrayNode.isEmpty) {
flush
}
@@ -76,8 +76,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
for (i <- 0 to maxRetryTimes) {
try {
- dorisStreamLoader.load(rowsBuffer)
- rowsBuffer.clear()
+ dorisStreamLoader.load(arrayNode.toString)
+ arrayNode.removeAll()
loop.break()
}
catch {
@@ -85,19 +85,19 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
try {
logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
//If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
- dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger))
+ dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings, logger))
Thread.sleep(1000 * i)
} catch {
case ex: InterruptedException =>
- logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
+ logger.warn("Data that failed to load : " + arrayNode.toString)
Thread.currentThread.interrupt()
throw new IOException("unable to flush; interrupted while doing another attempt", e)
}
}
}
- if(!rowsBuffer.isEmpty){
- logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
+ if (!arrayNode.isEmpty) {
+ logger.warn("Data that failed to load : " + arrayNode.toString)
throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
}
}
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
new file mode 100644
index 0000000..999a92c
--- /dev/null
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -0,0 +1,89 @@
+package org.apache.doris.spark.sql
+
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class TestConnectorWriteDoris {
+
+ val dorisFeNodes = "127.0.0.1:8030"
+ val dorisUser = "root"
+ val dorisPwd = ""
+ val dorisTable = "test.test_order"
+
+ val kafkaServers = "127.0.0.1:9093"
+ val kafkaTopics = "test_spark"
+
+ @Test
+ def listDataWriteTest(): Unit = {
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = spark.createDataFrame(Seq(
+ ("1", 100, "待付款"),
+ ("2", 200, "待发货"),
+ ("3", 300, "已收货")
+ )).toDF("order_id", "order_amount", "order_status")
+ df.write
+ .format("doris")
+ .option("doris.fenodes", dorisFeNodes)
+ .option("doris.table.identifier", dorisTable)
+ .option("user", dorisUser)
+ .option("password", dorisPwd)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .save()
+ spark.stop()
+ }
+
+
+ @Test
+ def csvDataWriteTest(): Unit = {
+ val spark = SparkSession.builder().master("local[*]").getOrCreate()
+ val df = spark.read
+ .option("header", "true") // uses the first line as names of columns
+ .option("inferSchema", "true") // infers the input schema automatically from data
+ .csv("data.csv")
+ df.createTempView("tmp_tb")
+ val doris = spark.sql(
+ """
+ |create TEMPORARY VIEW test_lh
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="test.test_lh",
+ | "fenodes"="127.0.0.1:8030",
+ | "user"="root",
+ | "password"=""
+ |);
+ |""".stripMargin)
+ spark.sql(
+ """
+ |insert into test_lh select name,gender,age from tmp_tb ;
+ |""".stripMargin)
+ spark.stop()
+ }
+
+ @Test
+ def structuredStreamingWriteTest(): Unit = {
+ val spark = SparkSession.builder()
+ .master("local")
+ .getOrCreate()
+ val df = spark.readStream
+ .option("kafka.bootstrap.servers", kafkaServers)
+ .option("startingOffsets", "latest")
+ .option("subscribe", kafkaTopics)
+ .format("kafka")
+ .option("failOnDataLoss", false)
+ .load()
+
+ df.selectExpr("CAST(value AS STRING)")
+ .writeStream
+ .format("doris")
+ .option("checkpointLocation", "/tmp/test")
+ .option("doris.table.identifier", dorisTable)
+ .option("doris.fenodes", dorisFeNodes)
+ .option("user", dorisUser)
+ .option("password", dorisPwd)
+ .option("sink.batch.size", 2)
+ .option("sink.max-retries", 2)
+ .start().awaitTermination()
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org