You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/03/01 10:45:23 UTC
carbondata git commit: [CARBONDATA-2055][Streaming] Support
integrating Stream table with Spark Streaming
Repository: carbondata
Updated Branches:
refs/heads/master e078ef4d9 -> a0fc0be02
[CARBONDATA-2055][Streaming] Support integrating Stream table with Spark Streaming
This closes #1867
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a0fc0be0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a0fc0be0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a0fc0be0
Branch: refs/heads/master
Commit: a0fc0be020d4e27df5c0c0649b810b1744f930c3
Parents: e078ef4
Author: Zhang Zhichao <44...@qq.com>
Authored: Sat Jan 27 00:03:19 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Thu Mar 1 18:42:25 2018 +0800
----------------------------------------------------------------------
.../CarbonBatchSparkStreamingExample.scala | 6 +-
.../CarbonStreamSparkStreamingExample.scala | 218 +++++++++++++++++++
...CarbonStructuredStreamingWithRowParser.scala | 2 +-
integration/spark2/pom.xml | 6 +
.../spark/sql/CarbonSparkStreamingFactory.scala | 60 +++++
.../TestStreamingTableWithRowParser.scala | 2 +-
streaming/pom.xml | 6 +
.../streaming/parser/CarbonStreamParser.java | 3 +
.../CarbonSparkStreamingListener.scala | 31 +++
.../streaming/CarbonStreamSparkStreaming.scala | 187 ++++++++++++++++
10 files changed, 514 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
index 6ae87b9..ef4dbce 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala
@@ -167,15 +167,11 @@ object CarbonBatchSparkStreamingExample {
.map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
- val df = SparkSession.builder().getOrCreate()
- .createDataFrame(rdd).toDF("id", "name", "city", "salary")
+ val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary")
println("at time: " + time.toString() + " the count of received data: " + df.count())
df.write
.format("carbondata")
.option("tableName", tableName)
- .option("tempCSV", "false")
- .option("compress", "true")
- .option("single_pass", "true")
.mode(SaveMode.Append)
.save()
}}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
new file mode 100644
index 0000000..f59a610
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.{File, PrintWriter}
+import java.net.ServerSocket
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.CarbonEnv
+import org.apache.spark.sql.CarbonSparkStreamingFactory
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.streaming.CarbonSparkStreamingListener
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+/**
+ * This example introduces how to use Spark Streaming to write data
+ * to CarbonData stream table.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+// scalastyle:off println
+object CarbonStreamSparkStreamingExample {
+
+ def main(args: Array[String]): Unit = {
+
+ // setup paths
+ val rootPath = new File(this.getClass.getResource("/").getPath
+ + "../../../..").getCanonicalPath
+ val checkpointPath =
+ s"$rootPath/examples/spark2/target/spark_streaming_cp_" +
+ System.currentTimeMillis().toString()
+ val streamTableName = s"dstream_stream_table"
+
+ val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4)
+
+ val requireCreateTable = true
+
+ if (requireCreateTable) {
+ // drop table if exists previously
+ spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }")
+ // Create target carbon table and populate with initial data
+ spark.sql(
+ s"""
+ | CREATE TABLE ${ streamTableName }(
+ | id INT,
+ | name STRING,
+ | city STRING,
+ | salary FLOAT
+ | )
+ | STORED BY 'carbondata'
+ | TBLPROPERTIES(
+ | 'streaming'='true',
+ | 'sort_columns'='name',
+ | 'dictionary_include'='city')
+ | """.stripMargin)
+ val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark)
+ val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ // batch load
+ val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv"
+ spark.sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$path'
+ | INTO TABLE $streamTableName
+ | OPTIONS('HEADER'='true')
+ """.stripMargin)
+
+ // streaming ingest
+ val serverSocket = new ServerSocket(7071)
+ val thread1 = writeSocket(serverSocket)
+ val thread2 = showTableCount(spark, streamTableName)
+ val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath)
+ // add a Spark Streaming Listener to remove all lock for stream tables when stop app
+ ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener())
+ // wait for stop signal to stop Spark Streaming App
+ waitForStopSignal(ssc)
+ // it need to start Spark Streaming App in main thread
+ // otherwise it will encounter an not-serializable exception.
+ ssc.start()
+ ssc.awaitTermination()
+ thread1.interrupt()
+ thread2.interrupt()
+ serverSocket.close()
+ }
+
+ spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false)
+
+ spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false)
+
+ // record(id = 100000001) comes from batch segment_0
+ // record(id = 1) comes from stream segment_1
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false)
+
+ // not filter
+ spark.sql(s"select * " +
+ s"from ${ streamTableName } " +
+ s"where id < 10 limit 100").show(100, truncate = false)
+
+ // show segments
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false)
+
+ spark.stop()
+ System.out.println("streaming finished")
+ }
+
+ def showTableCount(spark: SparkSession, tableName: String): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ for (_ <- 0 to 1000) {
+ println(System.currentTimeMillis())
+ spark.sql(s"select count(*) from $tableName").show(truncate = false)
+ spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def waitForStopSignal(ssc: StreamingContext): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App
+ new ServerSocket(7072).accept()
+ // don't stop SparkContext here
+ ssc.stop(false, true)
+ }
+ }
+ thread.start()
+ thread
+ }
+
+ def startStreaming(spark: SparkSession, tableName: String,
+ tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = {
+ var ssc: StreamingContext = null
+ try {
+ // recommend: the batch interval must set larger, such as 30s, 1min.
+ ssc = new StreamingContext(spark.sparkContext, Seconds(30))
+ ssc.checkpoint(checkpointPath)
+
+ val readSocketDF = ssc.socketTextStream("localhost", 7071)
+
+ val batchData = readSocketDF
+ .map(_.split(","))
+ .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat))
+
+ println("init carbon table info")
+ batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => {
+ val df = spark.createDataFrame(rdd).toDF()
+ println(System.currentTimeMillis().toString() +
+ " at batch time: " + time.toString() +
+ " the count of received data: " + df.count())
+ CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName)
+ .option(CarbonStreamParser.CARBON_STREAM_PARSER,
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
+ .mode(SaveMode.Append)
+ .writeStreamData(df, time)
+ }}
+ } catch {
+ case ex: Exception =>
+ ex.printStackTrace()
+ println("Done reading and writing streaming data")
+ }
+ ssc
+ }
+
+ def writeSocket(serverSocket: ServerSocket): Thread = {
+ val thread = new Thread() {
+ override def run(): Unit = {
+ // wait for client to connection request and accept
+ val clientSocket = serverSocket.accept()
+ val socketWriter = new PrintWriter(clientSocket.getOutputStream())
+ var index = 0
+ for (_ <- 1 to 1000) {
+ // write 5 records per iteration
+ for (_ <- 0 to 100) {
+ index = index + 1
+ socketWriter.println(index.toString + ",name_" + index
+ + ",city_" + index + "," + (index * 10000.00).toString +
+ ",school_" + index + ":school_" + index + index + "$" + index)
+ }
+ socketWriter.flush()
+ Thread.sleep(2000)
+ }
+ socketWriter.close()
+ System.out.println("Socket closed")
+ }
+ }
+ thread.start()
+ thread
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
index f134a8d..cce833b 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala
@@ -171,7 +171,7 @@ object CarbonStructuredStreamingWithRowParser {
.option("dbName", "default")
.option("tableName", "stream_table_with_row_parser")
.option(CarbonStreamParser.CARBON_STREAM_PARSER,
- "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
qry.awaitTermination()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index b68a55d..7d1d5bb 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -48,6 +48,12 @@
<artifactId>spark-repl_${scala.binary.version}</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.deps.scope}</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
new file mode 100644
index 0000000..15b038b
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.CarbonStreamSparkStreaming
+import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter
+
+/**
+ * Create [[CarbonStreamSparkStreamingWriter]] for stream table
+ * when integrate with Spark Streaming.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+object CarbonSparkStreamingFactory {
+
+ def getStreamSparkStreamingWriter(spark: SparkSession,
+ dbNameStr: String,
+ tableName: String): CarbonStreamSparkStreamingWriter =
+ synchronized {
+ val dbName = if (StringUtils.isEmpty(dbNameStr)) "default" else dbNameStr
+ val key = dbName + "." + tableName
+ if (CarbonStreamSparkStreaming.getTableMap.containsKey(key)) {
+ CarbonStreamSparkStreaming.getTableMap.get(key)
+ } else {
+ if (StringUtils.isEmpty(tableName) || tableName.contains(" ")) {
+ throw new CarbonStreamException("Table creation failed. " +
+ "Table name must not be blank or " +
+ "cannot contain blank space")
+ }
+ val carbonTable = CarbonEnv.getCarbonTable(Some(dbName),
+ tableName)(spark)
+ if (!carbonTable.isStreamingTable) {
+ throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+ s"${carbonTable.getTableName} is not a streaming table")
+ }
+ val streamWriter = new CarbonStreamSparkStreamingWriter(spark,
+ carbonTable, spark.sessionState.newHadoopConf())
+ CarbonStreamSparkStreaming.getTableMap.put(key, streamWriter)
+ streamWriter
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
index a3df2be..3e3b2c5 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala
@@ -784,7 +784,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll {
.option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
.option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff)
.option(CarbonStreamParser.CARBON_STREAM_PARSER,
- "org.apache.carbondata.streaming.parser.RowStreamParserImp")
+ CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
.start()
qry.awaitTermination()
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 987e530..968aa8b 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -26,6 +26,12 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>${spark.deps.scope}</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
index 643758c..e335626 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java
@@ -31,6 +31,9 @@ public interface CarbonStreamParser {
String CARBON_STREAM_PARSER_DEFAULT =
"org.apache.carbondata.streaming.parser.CSVStreamParserImp";
+ String CARBON_STREAM_PARSER_ROW_PARSER =
+ "org.apache.carbondata.streaming.parser.RowStreamParserImp";
+
void initialize(Configuration configuration, StructType structType);
Object[] parserRow(InternalRow value);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
new file mode 100644
index 0000000..6d1fa45
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.scheduler.SparkListenerApplicationEnd
+
+class CarbonSparkStreamingListener extends SparkListener {
+
+ /**
+ * When Spark Streaming App stops, remove all locks for stream table.
+ */
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ CarbonStreamSparkStreaming.cleanAllLockAfterStop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a0fc0be0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
new file mode 100644
index 0000000..4aa1517
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.Time
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ * Interface used to write stream data to stream table
+ * when integrate with Spark Streaming.
+ *
+ * NOTE: Current integration with Spark Streaming is an alpha feature.
+ */
+class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession,
+ val carbonTable: CarbonTable,
+ val configuration: Configuration) {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ private var isInitialize: Boolean = false
+
+ private var lock: ICarbonLock = null
+ private var carbonAppendableStreamSink: Sink = null
+
+ /**
+ * Acquired the lock for stream table
+ */
+ def lockStreamTable(): Unit = {
+ lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.STREAMING_LOCK)
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the lock for stream table: " +
+ carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ } else {
+ LOGGER.error("Not able to acquire the lock for stream table:" +
+ carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+ throw new InterruptedException(
+ "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ }
+ }
+
+ /**
+ * unlock for stream table
+ */
+ def unLockStreamTable(): Unit = {
+ if (null != lock) {
+ lock.unlock()
+ LOGGER.info("unlock for stream table: " +
+ carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ }
+ }
+
+ def initialize(): Unit = {
+ carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
+ sparkSession,
+ configuration,
+ carbonTable,
+ extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
+
+ lockStreamTable()
+
+ isInitialize = true
+ }
+
+ def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
+ if (!isInitialize) {
+ initialize()
+ }
+ carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
+ }
+
+ private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var mode: SaveMode = SaveMode.ErrorIfExists
+
+ this.option("dbName", carbonTable.getDatabaseName)
+ this.option("tableName", carbonTable.getTableName)
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * - `SaveMode.Overwrite`: overwrite the existing data.
+ * - `SaveMode.Append`: append the data.
+ * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+ * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
+ */
+ def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
+ if (mode == SaveMode.ErrorIfExists) {
+ mode = saveMode
+ }
+ this
+ }
+
+ /**
+ * Specifies the behavior when data or table already exists. Options include:
+ * - `overwrite`: overwrite the existing data.
+ * - `append`: append the data.
+ * - `ignore`: ignore the operation (i.e. no-op).
+ * - `error or default`: default option, throw an exception at runtime.
+ */
+ def mode(saveMode: String): CarbonStreamSparkStreamingWriter = {
+ if (mode == SaveMode.ErrorIfExists) {
+ mode = saveMode.toLowerCase(util.Locale.ROOT) match {
+ case "overwrite" => SaveMode.Overwrite
+ case "append" => SaveMode.Append
+ case "ignore" => SaveMode.Ignore
+ case "error" | "default" => SaveMode.ErrorIfExists
+ case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
+ "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
+ }
+ }
+ this
+ }
+
+ /**
+ * Adds an output option
+ */
+ def option(key: String, value: String): CarbonStreamSparkStreamingWriter = {
+ if (!extraOptions.contains(key)) {
+ extraOptions += (key -> value)
+ }
+ this
+ }
+
+ /**
+ * Adds an output option
+ */
+ def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter =
+ option(key, value.toString)
+
+ /**
+ * Adds an output option
+ */
+ def option(key: String, value: Long): CarbonStreamSparkStreamingWriter =
+ option(key, value.toString)
+
+ /**
+ * Adds an output option
+ */
+ def option(key: String, value: Double): CarbonStreamSparkStreamingWriter =
+ option(key, value.toString)
+}
+
+object CarbonStreamSparkStreaming {
+
+ @transient private val tableMap =
+ new util.HashMap[String, CarbonStreamSparkStreamingWriter]()
+
+ def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap
+
+ /**
+ * remove all stream lock.
+ */
+ def cleanAllLockAfterStop(): Unit = {
+ tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() }
+ tableMap.clear()
+ }
+}