You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/19 16:26:06 UTC

carbondata git commit: [CARBONDATA-2363] Add CarbonStreamingQueryListener to SparkSession

Repository: carbondata
Updated Branches:
  refs/heads/master 366415691 -> 8999a8aff


[CARBONDATA-2363] Add CarbonStreamingQueryListener to SparkSession

This closes #2188


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8999a8af
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8999a8af
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8999a8af

Branch: refs/heads/master
Commit: 8999a8affc0966e293a1f6cb3e4dfcf34c207e0e
Parents: 3664156
Author: QiangCai <qi...@qq.com>
Authored: Thu Apr 19 15:49:31 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 20 00:25:45 2018 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonSource.scala | 18 +++++++++++++++++-
 1 file changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8999a8af/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index ef23926..4ea1ac9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
@@ -43,7 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
+import org.apache.carbondata.streaming.{CarbonStreamException, CarbonStreamingQueryListener, StreamSinkFactory}
 
 /**
  * Carbon relation provider compliant to data source api.
@@ -245,6 +246,19 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                       s"${carbonTable.getTableName} is not a streaming table")
     }
 
+    // CarbonSession has added CarbonStreamingQueryListener during the initialization.
+    // But other SparkSessions didn't, so here will add the listener once.
+    if (!"CarbonSession".equals(sparkSession.getClass.getSimpleName)) {
+      if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) {
+        synchronized {
+          if (CarbonSource.listenerAdded.get(sparkSession.hashCode()).isEmpty) {
+            sparkSession.streams.addListener(new CarbonStreamingQueryListener(sparkSession))
+            CarbonSource.listenerAdded.put(sparkSession.hashCode(), true)
+          }
+        }
+      }
+    }
+
     // create sink
     StreamSinkFactory.createStreamTableSink(
       sqlContext.sparkSession,
@@ -257,6 +271,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
 
 object CarbonSource {
 
+  lazy val listenerAdded = new mutable.HashMap[Int, Boolean]()
+
   def createTableInfoFromParams(
       parameters: Map[String, String],
       dataSchema: StructType,