You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/06 14:51:53 UTC

carbondata git commit: [CARBONDATA-1848] Carbondata streaming sink adapt spark 2.2

Repository: carbondata
Updated Branches:
  refs/heads/master 59eff88b0 -> e36257fd2


[CARBONDATA-1848] Carbondata streaming sink adapt spark 2.2

Carbondata streaming sink adapt spark 2.2

This closes #1611


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e36257fd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e36257fd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e36257fd

Branch: refs/heads/master
Commit: e36257fd2548c86a95743ccc1096991f7be67d06
Parents: 59eff88
Author: QiangCai <qi...@qq.com>
Authored: Tue Dec 5 15:31:05 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 22:51:36 2017 +0800

----------------------------------------------------------------------
 .../org/apache/carbondata/examples/StreamExample.scala  |  3 +--
 .../spark/carbondata/TestStreamingTableOperation.scala  |  2 --
 .../streaming/CarbonStreamingQueryListener.scala        | 12 +++++++++++-
 3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
index 77c20bd..b59e960 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamExample.scala
@@ -21,7 +21,6 @@ import java.io.{File, PrintWriter}
 import java.net.ServerSocket
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -169,7 +168,7 @@ object StreamExample {
             .format("carbondata")
             .trigger(ProcessingTime("5 seconds"))
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
-            .option("tablePath", tablePath.getPath)
+            .option("dbName", "default")
             .option("tableName", "stream_table")
             .start()
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index de9d61f..75dcbdf 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -799,7 +799,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .format("carbondata")
             .trigger(ProcessingTime(s"$intervalSecond seconds"))
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
-            .option("tablePath", tablePath.getPath)
             .option("bad_records_action", badRecordAction)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
@@ -916,7 +915,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
             .format("carbondata")
             .trigger(ProcessingTime(s"${ intervalSecond } seconds"))
             .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
-            .option("tablePath", tablePath.getPath)
             .option("dbName", tableIdentifier.database.get)
             .option("tableName", tableIdentifier.table)
             .start()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e36257fd/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index c2789f4..07ef8ca 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
 import java.util
 import java.util.UUID
 
+import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.streaming.StreamingQueryListener
 
@@ -33,7 +34,16 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
   private val cache = new util.HashMap[UUID, ICarbonLock]()
 
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
-    val qry = spark.streams.get(event.id).asInstanceOf[StreamExecution]
+    val streamQuery = spark.streams.get(event.id)
+    val qry = if (SPARK_VERSION.startsWith("2.1")) {
+      // adapt spark 2.1
+      streamQuery.asInstanceOf[StreamExecution]
+    } else {
+      // adapt spark 2.2 and later version
+      val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper")
+      val method = clazz.getMethod("streamingQuery")
+      method.invoke(streamQuery).asInstanceOf[StreamExecution]
+    }
     if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) {
       LOGGER.info("Carbon streaming query started: " + event.id)
       val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]