You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/09/19 02:43:52 UTC

carbondata git commit: CARBONDATA-2945] Support ingest JSON record using StreamSQL

Repository: carbondata
Updated Branches:
  refs/heads/master 05033f71e -> b54512d1c


CARBONDATA-2945] Support ingest JSON record using StreamSQL

Support ingest JSON record from Kafka/socket stream source in StreamSQL
A tblproperty called "record_format" is added, for example, following creates a stream source table on kafka whose record format is json

CREATE TABLE source (
    id INT,
    name STRING,
    city STRING,
    salary FLOAT,
    file struct<school:array<string>, age:int>
)
STORED AS carbondata
TBLPROPERTIES(
    'streaming'='source',
    'format'='kafka',
    'kafka.bootstrap.servers'='localhost:9092',
    'subscribe'='test',
    'record_format'='json'   // can be csv or json
)

This closes #2731


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b54512d1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b54512d1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b54512d1

Branch: refs/heads/master
Commit: b54512d1c15026268b85e0ab5ae46699cf8d7082
Parents: 05033f7
Author: Jacky Li <ja...@qq.com>
Authored: Tue Sep 18 00:27:57 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Sep 19 10:39:41 2018 +0800

----------------------------------------------------------------------
 .../carbondata/examples/StreamSQLExample.scala  | 17 ++++-------
 .../examples/StructuredStreamingExample.scala   | 16 ++++++----
 .../stream/CarbonCreateStreamCommand.scala      | 32 +++++++++++++-------
 3 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
index 58f51bd..857a7ae 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.examples
 
-import java.io.File
 import java.net.ServerSocket
 
 import org.apache.carbondata.examples.util.ExampleUtils
@@ -26,13 +25,9 @@ import org.apache.carbondata.examples.util.ExampleUtils
 object StreamSQLExample {
   def main(args: Array[String]) {
 
-    // setup paths
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "../../../..").getCanonicalPath
-
     val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
-
     val requireCreateTable = true
+    val recordFormat = "json" // can be "json" or "csv"
 
     if (requireCreateTable) {
       // drop table if exists previously
@@ -45,7 +40,6 @@ object StreamSQLExample {
            | CREATE TABLE sink(
            | id INT,
            | name STRING,
-           | city STRING,
            | salary FLOAT,
            | file struct<school:array<string>, age:int>
            | )
@@ -56,11 +50,10 @@ object StreamSQLExample {
     }
 
     spark.sql(
-      """
+      s"""
         | CREATE TABLE source (
         | id INT,
         | name STRING,
-        | city STRING,
         | salary FLOAT,
         | file struct<school:array<string>, age:int>
         | )
@@ -69,7 +62,9 @@ object StreamSQLExample {
         | 'streaming'='source',
         | 'format'='socket',
         | 'host'='localhost',
-        | 'port'='7071')
+        | 'port'='7071',
+        | 'record_format'='$recordFormat'
+        | )
       """.stripMargin)
 
     val serverSocket = new ServerSocket(7071)
@@ -86,7 +81,7 @@ object StreamSQLExample {
 
     // start writing data into the socket
     import StructuredStreamingExample.{showTableCount, writeSocket}
-    val thread1 = writeSocket(serverSocket)
+    val thread1 = writeSocket(serverSocket, recordFormat)
     val thread2 = showTableCount(spark, "sink")
 
     System.out.println("type enter to interrupt streaming")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index 31de668..4e099af 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -52,7 +52,6 @@ object StructuredStreamingExample {
              | CREATE TABLE ${ streamTableName }(
              | id INT,
              | name STRING,
-             | city STRING,
              | salary FLOAT,
              | file struct<school:array<string>, age:int>
              | )
@@ -66,7 +65,6 @@ object StructuredStreamingExample {
              | CREATE TABLE ${ streamTableName }(
              | id INT,
              | name STRING,
-             | city STRING,
              | salary FLOAT
              | )
              | STORED BY 'carbondata'
@@ -176,7 +174,7 @@ object StructuredStreamingExample {
     thread
   }
 
-  def writeSocket(serverSocket: ServerSocket): Thread = {
+  def writeSocket(serverSocket: ServerSocket, recordFormat: String = "csv"): Thread = {
     val thread = new Thread() {
       override def run(): Unit = {
         // wait for client to connection request and accept
@@ -187,9 +185,15 @@ object StructuredStreamingExample {
           // write 5 records per iteration
           for (_ <- 0 to 1000) {
             index = index + 1
-            socketWriter.println(index.toString + ",name_" + index
-                                 + ",city_" + index + "," + (index * 10000.00).toString +
-                                 ",school_" + index + ":school_" + index + index + "$" + index)
+            recordFormat match {
+              case "csv" =>
+                socketWriter.println(index.toString + ",name_" + index
+                                     + "," + (index * 10000.00).toString +
+                                     ",school_" + index + ":school_" + index + index + "$" + index)
+              case "json" =>
+                socketWriter.println(
+                  s"""{"id":$index,"name":"s","salary":4.3,"file":{"school":["a","b"],"age":6}}""")
+            }
           }
           socketWriter.flush()
           Thread.sleep(1000)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 94e063b..1f8bde2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -80,11 +80,11 @@ case class CarbonCreateStreamCommand(
     val updatedQuery = if (format.equals("kafka")) {
       shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
       shouldHaveProperty(tblProperty, "subscribe", sourceTable)
-      createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty)
+      createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty.asScala)
     } else if (format.equals("socket")) {
       shouldHaveProperty(tblProperty, "host", sourceTable)
       shouldHaveProperty(tblProperty, "port", sourceTable)
-      createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty)
+      createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty.asScala)
     } else {
       // Replace the logical relation with a streaming relation created
       // from the stream source table
@@ -138,7 +138,7 @@ case class CarbonCreateStreamCommand(
       inputQuery: DataFrame,
       sourceTable: CarbonTable,
       sourceName: String,
-      tblProperty: util.Map[String, String]): LogicalPlan = {
+      tblProperty: mutable.Map[String, String]): LogicalPlan = {
     // We follow 3 steps to generate new plan
     // 1. replace the logical relation in stream query with streaming relation
     // 2. collect the new ExprId generated
@@ -151,32 +151,42 @@ case class CarbonCreateStreamCommand(
         s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
     }
 
-    val delimiter = tblProperty.asScala.getOrElse("delimiter", ",")
     val aliasMap = new util.HashMap[String, ExprId]()
     val updatedQuery = inputQuery.logicalPlan transform {
       case r: LogicalRelation
         if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
            r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
         // for kafka stream source, get the 'value' column and split it by using UDF
-        val kafkaPlan = sparkSession.readStream
+        val plan = sparkSession.readStream
           .format(sourceName)
           .options(tblProperty)
           .load()
           .selectExpr("CAST(value as string) as _value")
-          .selectExpr(
-            s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
-          .selectExpr(exprList: _*)
-          .logicalPlan
+        val recordFormat = tblProperty.getOrElse("record_format", "csv")
+        val newPlan = recordFormat match {
+            case "csv" =>
+              val delimiter = tblProperty.getOrElse("delimiter", ",")
+              plan.selectExpr(
+                s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
+                .selectExpr(exprList: _*)
+                .logicalPlan
+            case "json" =>
+              import org.apache.spark.sql.functions._
+              plan
+                .select(from_json(col("_value"), Util.convertToSparkSchema(sourceTable)) as "_data")
+                .select("_data.*")
+                .logicalPlan
+          }
 
         // collect the newly generated ExprId
-        kafkaPlan collect {
+        newPlan collect {
           case p@Project(projectList, child) =>
             projectList.map { expr =>
               aliasMap.put(expr.name, expr.exprId)
             }
             p
         }
-        kafkaPlan
+        newPlan
       case plan: LogicalPlan => plan
     }