You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/03/02 01:41:30 UTC

carbondata git commit: [CARBONDATA-2211] in case of DDL HandOff should not be execute in thread

Repository: carbondata
Updated Branches:
  refs/heads/master a0fc0be02 -> 7f7ea4d75


[CARBONDATA-2211] in case of DDL HandOff should not be execute in thread

1. DDL handoff will be executed in the blocking thread.
2. Auto handoff will be executed in a new non-blocking thread.

This closes #2008


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f7ea4d7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f7ea4d7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f7ea4d7

Branch: refs/heads/master
Commit: 7f7ea4d757d99b681512cb5f1369187f69f905c8
Parents: a0fc0be
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Feb 27 21:50:20 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Mar 2 09:40:20 2018 +0800

----------------------------------------------------------------------
 .../CarbonAlterTableCompactionCommand.scala        |  2 +-
 .../carbondata/streaming/StreamHandoffRDD.scala    | 17 +++++++++++------
 .../streaming/CarbonAppendableStreamSink.scala     |  3 ++-
 3 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index f6019e4..9b9ca0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -181,7 +181,7 @@ case class CarbonAlterTableCompactionCommand(
     if (compactionType == CompactionType.STREAMING) {
       StreamHandoffRDD.startStreamingHandoffThread(
         carbonLoadModel,
-        sqlContext.sparkSession)
+        sqlContext.sparkSession, true)
       return
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index b03ee1e..a46ced5 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -279,15 +279,20 @@ object StreamHandoffRDD {
    */
   def startStreamingHandoffThread(
       carbonLoadModel: CarbonLoadModel,
-      sparkSession: SparkSession
+      sparkSession: SparkSession,
+      isDDL: Boolean
   ): Unit = {
-    // start a new thread to execute streaming segment handoff
-    val handoffThread = new Thread() {
-      override def run(): Unit = {
-        iterateStreamingHandoff(carbonLoadModel, sparkSession)
+    if (isDDL) {
+      iterateStreamingHandoff(carbonLoadModel, sparkSession)
+    } else {
+      // start a new thread to execute streaming segment handoff
+      val handoffThread = new Thread() {
+        override def run(): Unit = {
+          iterateStreamingHandoff(carbonLoadModel, sparkSession)
+        }
       }
+      handoffThread.start()
     }
-    handoffThread.start()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index f2f9853..312d24e 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -178,7 +178,8 @@ class CarbonAppendableStreamSink(
       if (enableAutoHandoff) {
         StreamHandoffRDD.startStreamingHandoffThread(
           carbonLoadModel,
-          sparkSession)
+          sparkSession,
+          false)
       }
     }
   }