You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/09/22 05:36:06 UTC
[08/50] [abbrv] incubator-carbondata git commit: [CARBONDATA-227] In
block distribution parralelism is decided initially and not re initialized
after requesting new executors. Due to this task per node initialization is
getting wrong.
[CARBONDATA-227] In block distribution parralelism is decided initially and not re initialized after requesting new executors. Due to this task per node initialization is getting wrong.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/acb1d979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/acb1d979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/acb1d979
Branch: refs/heads/branch-0.1
Commit: acb1d979e4f61bf727f69ce553f0f9cad0954ae8
Parents: e1f34cc
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Thu Sep 8 10:37:49 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Sep 22 09:34:05 2016 +0530
----------------------------------------------------------------------
.../apache/carbondata/spark/rdd/CarbonMergerRDD.scala | 4 ++--
.../org/apache/carbondata/spark/rdd/CarbonScanRDD.scala | 11 ++++++++---
2 files changed, 10 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index f160fd9..54d7539 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -55,7 +55,6 @@ class CarbonMergerRDD[K, V](
confExecutorsTemp: String)
extends RDD[(K, V)](sc, Nil) with Logging {
- val defaultParallelism = sc.defaultParallelism
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
@@ -228,6 +227,7 @@ class CarbonMergerRDD[K, V](
)
val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier)
+ var defaultParallelism = sparkContext.defaultParallelism
val result = new util.ArrayList[Partition](defaultParallelism)
// mapping of the node and block list.
@@ -299,7 +299,7 @@ class CarbonMergerRDD[K, V](
maxTimes = maxTimes - 1
}
logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
-
+ defaultParallelism = sparkContext.defaultParallelism
var i = 0
val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/acb1d979/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5f50644..497d9f8 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -73,9 +73,9 @@ class CarbonScanRDD[V: ClassTag](
baseStoreLocation: String)
extends RDD[V](sc, Nil) with Logging {
- val defaultParallelism = sc.defaultParallelism
override def getPartitions: Array[Partition] = {
+ var defaultParallelism = sparkContext.defaultParallelism
val statisticRecorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance()
val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
@@ -110,13 +110,18 @@ class CarbonScanRDD[V: ClassTag](
new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets)
)
)
+ var activeNodes = Array[String]()
+ if(blockListTemp.nonEmpty) {
+ activeNodes = DistributionUtil
+ .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext)
+ }
+ defaultParallelism = sparkContext.defaultParallelism
val blockList = CarbonLoaderUtil.
distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala
+
if (blockList.nonEmpty) {
var statistic = new QueryStatistic()
// group blocks to nodes, tasks
- val activeNodes = DistributionUtil
- .ensureExecutorsAndGetNodeList(blockList.toArray, sparkContext)
val nodeBlockMapping =
CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
activeNodes.toList.asJava