You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2022/04/14 01:48:50 UTC

[incubator-doris-spark-connector] branch master updated: [improvement] stream load data is converted to json format (#15)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new b80046d  [improvement] stream load data is converted to json format (#15)
b80046d is described below

commit b80046d55682b7760d8efbf8b66337cceb369ed8
Author: smallhibiscus <84...@qq.com>
AuthorDate: Thu Apr 14 09:48:46 2022 +0800

    [improvement] stream load data is converted to json format (#15)
    
    * [improvement] stream load data is converted to json format
    
    * Add unit test and Schema.java add keysType property
    
    * modify doris read kafka only with jsonobject format
    
    * format code
    
    Co-authored-by: smallhibiscus <844981280>
---
 .../org/apache/doris/spark/DorisStreamLoad.java    | 49 ++++++++++--
 .../org/apache/doris/spark/rest/models/Schema.java | 10 +++
 .../doris/spark/sql/DorisSourceProvider.scala      |  8 +-
 .../doris/spark/sql/DorisStreamLoadSink.scala      | 24 +++---
 .../doris/spark/sql/TestConnectorWriteDoris.scala  | 89 ++++++++++++++++++++++
 5 files changed, 157 insertions(+), 23 deletions(-)

diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
index b74e0a8..db3ef7d 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.doris.spark;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.doris.spark.cfg.ConfigurationOptions;
 import org.apache.doris.spark.cfg.SparkSettings;
@@ -35,13 +36,16 @@ import java.io.Serializable;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Base64;
-import java.util.Calendar;
 import java.util.List;
 import java.util.StringJoiner;
 import java.util.UUID;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Calendar;
+
 
 /**
  * DorisStreamLoad
@@ -63,6 +67,7 @@ public class DorisStreamLoad implements Serializable{
     private String tbl;
     private String authEncoding;
     private String columns;
+    private String[] dfColumns;
 
     public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
         this.hostPort = hostPort;
@@ -87,6 +92,20 @@ public class DorisStreamLoad implements Serializable{
         this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
     }
 
+    public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOException, DorisException {
+        String hostPort = RestService.randomBackendV2(settings, LOG);
+        this.hostPort = hostPort;
+        String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
+        this.db = dbTable[0];
+        this.tbl = dbTable[1];
+        this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
+        this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
+        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+        this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
+        this.dfColumns = dfColumns;
+    }
+
     public String getLoadUrlStr() {
         return loadUrlStr;
     }
@@ -115,6 +134,8 @@ public class DorisStreamLoad implements Serializable{
         }
         conn.setDoOutput(true);
         conn.setDoInput(true);
+        conn.addRequestProperty("format", "json");
+        conn.addRequestProperty("strip_outer_array", "true");
         return conn;
     }
 
@@ -138,7 +159,7 @@ public class DorisStreamLoad implements Serializable{
         }
     }
 
-    public String listToString(List<List<Object>> rows){
+    public String listToString(List<List<Object>> rows) {
         StringJoiner lines = new StringJoiner(LINE_DELIMITER);
         for (List<Object> row : rows) {
             StringJoiner line = new StringJoiner(FIELD_DELIMITER);
@@ -155,10 +176,24 @@ public class DorisStreamLoad implements Serializable{
     }
 
 
-    public void load(List<List<Object>> rows) throws StreamLoadException {
-        String records = listToString(rows);
-        load(records);
+    public void loadV2(List<List<Object>> rows) throws StreamLoadException, JsonProcessingException {
+        List<Map<Object,Object>> dataList = new ArrayList<>();
+        try {
+            for (List<Object> row : rows) {
+                Map<Object,Object> dataMap = new HashMap<>();
+                if (dfColumns.length == row.size()) {
+                    for (int i = 0; i < dfColumns.length; i++) {
+                        dataMap.put(dfColumns[i], row.get(i));
+                    }
+                }
+                dataList.add(dataMap);
+            }
+        } catch (Exception e) {
+            throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
+        }
+        load((new ObjectMapper()).writeValueAsString(dataList));
     }
+
     public void load(String value) throws StreamLoadException {
         LOG.debug("Streamload Request:{} ,Body:{}", loadUrlStr, value);
         LoadResponse loadResponse = loadBatch(value);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
index 0a437bf..a236325 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Schema.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 
 public class Schema {
     private int status = 0;
+    private String keysType;
     private List<Field> properties;
     private String keysType;
 
@@ -42,6 +43,14 @@ public class Schema {
         this.status = status;
     }
 
+    public String getKeysType() {
+        return keysType;
+    }
+
+    public void setKeysType(String keysType) {
+        this.keysType = keysType;
+    }
+
     public List<Field> getProperties() {
         return properties;
     }
@@ -99,6 +108,7 @@ public class Schema {
     public String toString() {
         return "Schema{" +
                 "status=" + status +
+                ", keysType='" + keysType +
                 ", properties=" + properties +
                 '}';
     }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index 9b7d3f0..58c974c 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -60,7 +60,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
     val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf)
     sparkSettings.merge(Utils.params(parameters, logger).asJava)
     // init stream loader
-    val dorisStreamLoader = new DorisStreamLoad(sparkSettings)
+    val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns)
 
     val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT)
     val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
@@ -93,7 +93,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
 
           for (i <- 1 to maxRetryTimes) {
             try {
-              dorisStreamLoader.load(rowsBuffer)
+              dorisStreamLoader.loadV2(rowsBuffer)
               rowsBuffer.clear()
               loop.break()
             }
@@ -102,7 +102,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
                 try {
                   logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
                   //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
-                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings,logger))
+                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(sparkSettings, logger))
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
@@ -113,7 +113,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
             }
           }
 
-          if(!rowsBuffer.isEmpty){
+          if (!rowsBuffer.isEmpty) {
             logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
             throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
           }
diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
index 6e73698..a1fcbf8 100644
--- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
+++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
@@ -17,6 +17,7 @@
 
 package org.apache.doris.spark.sql
 
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
 import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad}
 import org.apache.spark.sql.execution.QueryExecution
@@ -25,7 +26,6 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.slf4j.{Logger, LoggerFactory}
 import java.io.IOException
 import java.util
-
 import org.apache.doris.spark.rest.RestService
 
 import scala.util.control.Breaks
@@ -49,20 +49,20 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
 
   def write(queryExecution: QueryExecution): Unit = {
     queryExecution.toRdd.foreachPartition(iter => {
-      val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]]()
+      val objectMapper = new ObjectMapper()
+      val arrayNode = objectMapper.createArrayNode()
       iter.foreach(row => {
         val line: util.List[Object] = new util.ArrayList[Object](maxRowCount)
         for (i <- 0 until row.numFields) {
           val field = row.copy().getUTF8String(i)
-          line.add(field.asInstanceOf[AnyRef])
+          arrayNode.add(objectMapper.readTree(field.toString))
         }
-        rowsBuffer.add(line)
-        if (rowsBuffer.size > maxRowCount - 1) {
+        if (arrayNode.size > maxRowCount - 1) {
           flush
         }
       })
       // flush buffer
-      if (!rowsBuffer.isEmpty) {
+      if (!arrayNode.isEmpty) {
         flush
       }
 
@@ -76,8 +76,8 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
 
           for (i <- 0 to maxRetryTimes) {
             try {
-              dorisStreamLoader.load(rowsBuffer)
-              rowsBuffer.clear()
+              dorisStreamLoader.load(arrayNode.toString)
+              arrayNode.removeAll()
               loop.break()
             }
             catch {
@@ -85,19 +85,19 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
                 try {
                   logger.warn("Failed to load data on BE: {} node ", dorisStreamLoader.getLoadUrlStr)
                   //If the current BE node fails to execute Stream Load, randomly switch to other BE nodes and try again
-                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings,logger))
+                  dorisStreamLoader.setHostPort(RestService.randomBackendV2(settings, logger))
                   Thread.sleep(1000 * i)
                 } catch {
                   case ex: InterruptedException =>
-                    logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
+                    logger.warn("Data that failed to load : " + arrayNode.toString)
                     Thread.currentThread.interrupt()
                     throw new IOException("unable to flush; interrupted while doing another attempt", e)
                 }
             }
           }
 
-          if(!rowsBuffer.isEmpty){
-            logger.warn("Data that failed to load : " + dorisStreamLoader.listToString(rowsBuffer))
+          if (!arrayNode.isEmpty) {
+            logger.warn("Data that failed to load : " + arrayNode.toString)
             throw new IOException(s"Failed to load data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max retry times.")
           }
         }
diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
new file mode 100644
index 0000000..999a92c
--- /dev/null
+++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestConnectorWriteDoris.scala
@@ -0,0 +1,89 @@
+package org.apache.doris.spark.sql
+
+import org.apache.spark.sql.SparkSession
+import org.junit.Test
+
+class TestConnectorWriteDoris {
+
+  val dorisFeNodes = "127.0.0.1:8030"
+  val dorisUser = "root"
+  val dorisPwd = ""
+  val dorisTable = "test.test_order"
+
+  val kafkaServers = "127.0.0.1:9093"
+  val kafkaTopics = "test_spark"
+
+  @Test
+  def listDataWriteTest(): Unit = {
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = spark.createDataFrame(Seq(
+      ("1", 100, "待付款"),
+      ("2", 200, "待发货"),
+      ("3", 300, "已收货")
+    )).toDF("order_id", "order_amount", "order_status")
+    df.write
+      .format("doris")
+      .option("doris.fenodes", dorisFeNodes)
+      .option("doris.table.identifier", dorisTable)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
+      .save()
+    spark.stop()
+  }
+
+
+  @Test
+  def csvDataWriteTest(): Unit = {
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    val df = spark.read
+      .option("header", "true") // uses the first line as names of columns
+      .option("inferSchema", "true") // infers the input schema automatically from data
+      .csv("data.csv")
+    df.createTempView("tmp_tb")
+    val doris = spark.sql(
+      """
+        |create  TEMPORARY VIEW test_lh
+        |USING doris
+        |OPTIONS(
+        | "table.identifier"="test.test_lh",
+        | "fenodes"="127.0.0.1:8030",
+        | "user"="root",
+        | "password"=""
+        |);
+        |""".stripMargin)
+    spark.sql(
+      """
+        |insert into test_lh select  name,gender,age from tmp_tb ;
+        |""".stripMargin)
+    spark.stop()
+  }
+
+  @Test
+  def structuredStreamingWriteTest(): Unit = {
+    val spark = SparkSession.builder()
+      .master("local")
+      .getOrCreate()
+    val df = spark.readStream
+      .option("kafka.bootstrap.servers", kafkaServers)
+      .option("startingOffsets", "latest")
+      .option("subscribe", kafkaTopics)
+      .format("kafka")
+      .option("failOnDataLoss", false)
+      .load()
+
+    df.selectExpr("CAST(value AS STRING)")
+      .writeStream
+      .format("doris")
+      .option("checkpointLocation", "/tmp/test")
+      .option("doris.table.identifier", dorisTable)
+      .option("doris.fenodes", dorisFeNodes)
+      .option("user", dorisUser)
+      .option("password", dorisPwd)
+      .option("sink.batch.size", 2)
+      .option("sink.max-retries", 2)
+      .start().awaitTermination()
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org