You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/03/02 01:41:30 UTC
carbondata git commit: [CARBONDATA-2211] in case of DDL HandOff
should not be execute in thread
Repository: carbondata
Updated Branches:
refs/heads/master a0fc0be02 -> 7f7ea4d75
[CARBONDATA-2211] in case of DDL HandOff should not be execute in thread
1. DDL handoff will be executed in the blocking thread.
2. Auto handoff will be executed in a new non-blocking thread.
This closes #2008
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f7ea4d7
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f7ea4d7
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f7ea4d7
Branch: refs/heads/master
Commit: 7f7ea4d757d99b681512cb5f1369187f69f905c8
Parents: a0fc0be
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Feb 27 21:50:20 2018 +0530
Committer: QiangCai <qi...@qq.com>
Committed: Fri Mar 2 09:40:20 2018 +0800
----------------------------------------------------------------------
.../CarbonAlterTableCompactionCommand.scala | 2 +-
.../carbondata/streaming/StreamHandoffRDD.scala | 17 +++++++++++------
.../streaming/CarbonAppendableStreamSink.scala | 3 ++-
3 files changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index f6019e4..9b9ca0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -181,7 +181,7 @@ case class CarbonAlterTableCompactionCommand(
if (compactionType == CompactionType.STREAMING) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
- sqlContext.sparkSession)
+ sqlContext.sparkSession, true)
return
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index b03ee1e..a46ced5 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -279,15 +279,20 @@ object StreamHandoffRDD {
*/
def startStreamingHandoffThread(
carbonLoadModel: CarbonLoadModel,
- sparkSession: SparkSession
+ sparkSession: SparkSession,
+ isDDL: Boolean
): Unit = {
- // start a new thread to execute streaming segment handoff
- val handoffThread = new Thread() {
- override def run(): Unit = {
- iterateStreamingHandoff(carbonLoadModel, sparkSession)
+ if (isDDL) {
+ iterateStreamingHandoff(carbonLoadModel, sparkSession)
+ } else {
+ // start a new thread to execute streaming segment handoff
+ val handoffThread = new Thread() {
+ override def run(): Unit = {
+ iterateStreamingHandoff(carbonLoadModel, sparkSession)
+ }
}
+ handoffThread.start()
}
- handoffThread.start()
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f7ea4d7/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index f2f9853..312d24e 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -178,7 +178,8 @@ class CarbonAppendableStreamSink(
if (enableAutoHandoff) {
StreamHandoffRDD.startStreamingHandoffThread(
carbonLoadModel,
- sparkSession)
+ sparkSession,
+ false)
}
}
}