You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/06 14:51:53 UTC
carbondata git commit: [CARBONDATA-1848] Carbondata streaming sink
adapt spark 2.2
Repository: carbondata
Updated Branches:
refs/heads/master 59eff88b0 -> e36257fd2
[CARBONDATA-1848] Carbondata streaming sink adapt spark 2.2
Carbondata streaming sink adapt spark 2.2
This closes #1611
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e36257fd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e36257fd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e36257fd
Branch: refs/heads/master
Commit: e36257fd2548c86a95743ccc1096991f7be67d06
Parents: 59eff88
Author: QiangCai <qi...@qq.com>
Authored: Tue Dec 5 15:31:05 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 22:51:36 2017 +0800
----------------------------------------------------------------------
.../org/apache/carbondata/examples/StreamExample.scala | 3 +--
.../spark/carbondata/TestStreamingTableOperation.scala | 2 --
.../streaming/CarbonStreamingQueryListener.scala | 12 +++++++++++-
3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index 77c20bd..b59e960 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -21,7 +21,6 @@ import java.io.{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -169,7 +168,7 @@ object StreamExample {
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
- .option("tablePath", tablePath.getPath)
+ .option("dbName", "default")
.option("tableName", "stream_table")
.start()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index de9d61f..75dcbdf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -799,7 +799,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.format("carbondata")
.trigger(ProcessingTime(s"$intervalSecond seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
- .option("tablePath", tablePath.getPath)
.option("bad_records_action", badRecordAction)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
@@ -916,7 +915,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
.format("carbondata")
.trigger(ProcessingTime(s"${ intervalSecond } seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
- .option("tablePath", tablePath.getPath)
.option("dbName", tableIdentifier.database.get)
.option("tableName", tableIdentifier.table)
.start()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index c2789f4..07ef8ca 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util
import java.util.UUID
+import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQueryListener
@@ -33,7 +34,16 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
private val cache = new util.HashMap[UUID, ICarbonLock]()
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
- val qry = spark.streams.get(event.id).asInstanceOf[StreamExecution]
+ val streamQuery = spark.streams.get(event.id)
+ val qry = if (SPARK_VERSION.startsWith("2.1")) {
+ // adapt spark 2.1
+ streamQuery.asInstanceOf[StreamExecution]
+ } else {
+ // adapt spark 2.2 and later version
+ val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper")
+ val method = clazz.getMethod("streamingQuery")
+ method.invoke(streamQuery).asInstanceOf[StreamExecution]
+ }
if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) {
LOGGER.info("Carbon streaming query started: " + event.id)
val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]