You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/09/22 05:36:06 UTC

[08/50] [abbrv] incubator-carbondata git commit: [CARBONDATA-227] In block distribution parralelism is decided initially and not re initialized after requesting new executors. Due to this task per node initialization is getting wrong.

[CARBONDATA-227] In block distribution parralelism is decided initially and not re initialized after requesting new executors. Due to this task per node initialization is getting wrong.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/acb1d979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/acb1d979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/acb1d979

Branch: refs/heads/branch-0.1
Commit: acb1d979e4f61bf727f69ce553f0f9cad0954ae8
Parents: e1f34cc
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Thu Sep 8 10:37:49 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Sep 22 09:34:05 2016 +0530

----------------------------------------------------------------------
 .../apache/carbondata/spark/rdd/CarbonMergerRDD.scala    |  4 ++--
 .../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala  | 11 ++++++++---
 2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f160fd9..54d7539 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -55,7 +55,6 @@ class CarbonMergerRDD[K, V](
   confExecutorsTemp: String)
   extends RDD[(K, V)](sc, Nil) with Logging {
 
-  val defaultParallelism = sc.defaultParallelism
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
 
@@ -228,6 +227,7 @@ class CarbonMergerRDD[K, V](
     )
     val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
       QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+    var defaultParallelism = sparkContext.defaultParallelism
     val result = new util.ArrayList[Partition](defaultParallelism)
 
     // mapping of the node and block list.
@@ -299,7 +299,7 @@ class CarbonMergerRDD[K, V](
       maxTimes = maxTimes - 1
     }
     logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
-
+    defaultParallelism = sparkContext.defaultParallelism
     var i = 0
 
     val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5f50644..497d9f8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -73,9 +73,9 @@ class CarbonScanRDD[V: ClassTag](
     baseStoreLocation: String)
   extends RDD[V](sc, Nil) with Logging {
 
-  val defaultParallelism = sc.defaultParallelism
 
   override def getPartitions: Array[Partition] = {
+    var defaultParallelism = sparkContext.defaultParallelism
     val statisticRecorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance()
     val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
       QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
@@ -110,13 +110,18 @@ class CarbonScanRDD[V: ClassTag](
           new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
         )
       )
+      var activeNodes = Array[String]()
+      if(blockListTemp.nonEmpty) {
+         activeNodes = DistributionUtil
+          .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
+      }
+      defaultParallelism = sparkContext.defaultParallelism
       val blockList = CarbonLoaderUtil.
         distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
+
       if (blockList.nonEmpty) {
         var statistic = new QueryStatistic()
         // group blocks to nodes, tasks
-        val activeNodes = DistributionUtil
-          .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
         val nodeBlockMapping =
           CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
             activeNodes.toList.asJava