You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/09/19 02:43:52 UTC
carbondata git commit: CARBONDATA-2945] Support ingest JSON record
using StreamSQL
Repository: carbondata
Updated Branches:
refs/heads/master 05033f71e -> b54512d1c
CARBONDATA-2945] Support ingest JSON record using StreamSQL
Support ingest JSON record from Kafka/socket stream source in StreamSQL
A tblproperty called "record_format" is added, for example, following creates a stream source table on kafka whose record format is json
CREATE TABLE source (
id INT,
name STRING,
city STRING,
salary FLOAT,
file struct<school:array<string>, age:int>
)
STORED AS carbondata
TBLPROPERTIES(
'streaming'='source',
'format'='kafka',
'kafka.bootstrap.servers'='localhost:9092',
'subscribe'='test',
'record_format'='json' // can be csv or json
)
This closes #2731
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b54512d1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b54512d1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b54512d1
Branch: refs/heads/master
Commit: b54512d1c15026268b85e0ab5ae46699cf8d7082
Parents: 05033f7
Author: Jacky Li <ja...@qq.com>
Authored: Tue Sep 18 00:27:57 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Sep 19 10:39:41 2018 +0800
----------------------------------------------------------------------
.../carbondata/examples/StreamSQLExample.scala | 17 ++++-------
.../examples/StructuredStreamingExample.scala | 16 ++++++----
.../stream/CarbonCreateStreamCommand.scala | 32 +++++++++++++-------
3 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
index 58f51bd..857a7ae 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala
@@ -17,7 +17,6 @@
package org.apache.carbondata.examples
-import java.io.File
import java.net.ServerSocket
import org.apache.carbondata.examples.util.ExampleUtils
@@ -26,13 +25,9 @@ import org.apache.carbondata.examples.util.ExampleUtils
object StreamSQLExample {
def main(args: Array[String]) {
- // setup paths
- val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
-
val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4)
-
val requireCreateTable = true
+ val recordFormat = "json" // can be "json" or "csv"
if (requireCreateTable) {
// drop table if exists previously
@@ -45,7 +40,6 @@ object StreamSQLExample {
| CREATE TABLE sink(
| id INT,
| name STRING,
- | city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
@@ -56,11 +50,10 @@ object StreamSQLExample {
}
spark.sql(
- """
+ s"""
| CREATE TABLE source (
| id INT,
| name STRING,
- | city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
@@ -69,7 +62,9 @@ object StreamSQLExample {
| 'streaming'='source',
| 'format'='socket',
| 'host'='localhost',
- | 'port'='7071')
+ | 'port'='7071',
+ | 'record_format'='$recordFormat'
+ | )
""".stripMargin)
val serverSocket = new ServerSocket(7071)
@@ -86,7 +81,7 @@ object StreamSQLExample {
// start writing data into the socket
import StructuredStreamingExample.{showTableCount, writeSocket}
- val thread1 = writeSocket(serverSocket)
+ val thread1 = writeSocket(serverSocket, recordFormat)
val thread2 = showTableCount(spark, "sink")
System.out.println("type enter to interrupt streaming")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
index 31de668..4e099af 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala
@@ -52,7 +52,6 @@ object StructuredStreamingExample {
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
- | city STRING,
| salary FLOAT,
| file struct<school:array<string>, age:int>
| )
@@ -66,7 +65,6 @@ object StructuredStreamingExample {
| CREATE TABLE ${ streamTableName }(
| id INT,
| name STRING,
- | city STRING,
| salary FLOAT
| )
| STORED BY 'carbondata'
@@ -176,7 +174,7 @@ object StructuredStreamingExample {
thread
}
- def writeSocket(serverSocket: ServerSocket): Thread = {
+ def writeSocket(serverSocket: ServerSocket, recordFormat: String = "csv"): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
@@ -187,9 +185,15 @@ object StructuredStreamingExample {
// write 5 records per iteration
for (_ <- 0 to 1000) {
index = index + 1
- socketWriter.println(index.toString + ",name_" + index
- + ",city_" + index + "," + (index * 10000.00).toString +
- ",school_" + index + ":school_" + index + index + "$" + index)
+ recordFormat match {
+ case "csv" =>
+ socketWriter.println(index.toString + ",name_" + index
+ + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ case "json" =>
+ socketWriter.println(
+ s"""{"id":$index,"name":"s","salary":4.3,"file":{"school":["a","b"],"age":6}}""")
+ }
}
socketWriter.flush()
Thread.sleep(1000)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b54512d1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 94e063b..1f8bde2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -80,11 +80,11 @@ case class CarbonCreateStreamCommand(
val updatedQuery = if (format.equals("kafka")) {
shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable)
shouldHaveProperty(tblProperty, "subscribe", sourceTable)
- createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty)
+ createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty.asScala)
} else if (format.equals("socket")) {
shouldHaveProperty(tblProperty, "host", sourceTable)
shouldHaveProperty(tblProperty, "port", sourceTable)
- createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty)
+ createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty.asScala)
} else {
// Replace the logical relation with a streaming relation created
// from the stream source table
@@ -138,7 +138,7 @@ case class CarbonCreateStreamCommand(
inputQuery: DataFrame,
sourceTable: CarbonTable,
sourceName: String,
- tblProperty: util.Map[String, String]): LogicalPlan = {
+ tblProperty: mutable.Map[String, String]): LogicalPlan = {
// We follow 3 steps to generate new plan
// 1. replace the logical relation in stream query with streaming relation
// 2. collect the new ExprId generated
@@ -151,32 +151,42 @@ case class CarbonCreateStreamCommand(
s"case when size(_values) > $i then _values[$i] else null end AS $columnName"
}
- val delimiter = tblProperty.asScala.getOrElse("delimiter", ",")
val aliasMap = new util.HashMap[String, ExprId]()
val updatedQuery = inputQuery.logicalPlan transform {
case r: LogicalRelation
if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource =>
// for kafka stream source, get the 'value' column and split it by using UDF
- val kafkaPlan = sparkSession.readStream
+ val plan = sparkSession.readStream
.format(sourceName)
.options(tblProperty)
.load()
.selectExpr("CAST(value as string) as _value")
- .selectExpr(
- s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
- .selectExpr(exprList: _*)
- .logicalPlan
+ val recordFormat = tblProperty.getOrElse("record_format", "csv")
+ val newPlan = recordFormat match {
+ case "csv" =>
+ val delimiter = tblProperty.getOrElse("delimiter", ",")
+ plan.selectExpr(
+ s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values")
+ .selectExpr(exprList: _*)
+ .logicalPlan
+ case "json" =>
+ import org.apache.spark.sql.functions._
+ plan
+ .select(from_json(col("_value"), Util.convertToSparkSchema(sourceTable)) as "_data")
+ .select("_data.*")
+ .logicalPlan
+ }
// collect the newly generated ExprId
- kafkaPlan collect {
+ newPlan collect {
case p@Project(projectList, child) =>
projectList.map { expr =>
aliasMap.put(expr.name, expr.exprId)
}
p
}
- kafkaPlan
+ newPlan
case plan: LogicalPlan => plan
}