You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/20 10:13:31 UTC
[03/50] [abbrv] incubator-carbondata git commit: [Bug] if activeNodes
contains the list of ip's of active nodes,
node allocation was not working. (#805)
[Bug] if activeNodes contains the list of ip's of active nodes, node allocation was not working. (#805)
if activeNodes contains the list of ip's of active nodes and the block info contains the the hostname vs no of block, then node allocation was not working.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cc9d949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cc9d949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cc9d949
Branch: refs/heads/master
Commit: 2cc9d9493cc9730b0dec6ea9c987299a94d35da6
Parents: 18ef176
Author: Mohammad Shahid Khan <mo...@gmail.com>
Authored: Fri Jul 15 02:30:45 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 15 02:30:45 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 19 ++++++++++++++++++-
.../carbondata/spark/rdd/CarbonQueryRDD.scala | 4 +++-
2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cc9d949/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index f81a006..a9378d2 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1233,10 +1233,20 @@ public final class CarbonLoaderUtil {
// are assigned first
Collections.sort(multiBlockRelations);
+ Set<String> validActiveNodes = new HashSet<String>();
+ // find all the valid active nodes
for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
String nodeName = nodeMultiBlockRelation.getNode();
//assign the block to the node only if the node is active
- if (null != activeNodes && !isActiveExecutor(activeNodes, nodeName)) {
+ if (null != activeNodes && isActiveExecutor(activeNodes, nodeName)) {
+ validActiveNodes.add(nodeName);
+ }
+ }
+
+ for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+ String nodeName = nodeMultiBlockRelation.getNode();
+ //assign the block to the node only if the node is active
+ if (!validActiveNodes.isEmpty() && !validActiveNodes.contains(nodeName)) {
continue;
}
// this loop will be for each NODE
@@ -1287,6 +1297,13 @@ public final class CarbonLoaderUtil {
} catch (UnknownHostException ue) {
isActiveNode = false;
}
+ } else {
+ try {
+ String hostAddress = InetAddress.getLocalHost().getHostAddress();
+ isActiveNode = activeNode.contains(hostAddress);
+ } catch (UnknownHostException ue) {
+ isActiveNode = false;
+ }
}
return isActiveNode;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cc9d949/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 0f81bbd..9d81f0f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -119,13 +119,15 @@ class CarbonQueryRDD[V: ClassTag](
confExecutors = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
}
}
+ val startTime = System.currentTimeMillis
val activeNodes = DistributionUtil
.ensureExecutorsAndGetNodeList(requiredExecutors, confExecutors, sparkContext)
val nodeBlockMapping =
CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
activeNodes.toList.asJava
)
-
+ val timeElapsed: Long = System.currentTimeMillis - startTime
+ LOGGER.info("Total Time taken in block allocation : " + timeElapsed)
var i = 0
// Create Spark Partition for each task and assign blocks
nodeBlockMapping.asScala.foreach { entry =>