You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/07/20 10:13:31 UTC

[03/50] [abbrv] incubator-carbondata git commit: [Bug] if activeNodes contains the list of ip's of active nodes, node allocation was not working. (#805)

[Bug] if activeNodes contains the list of ip's of active nodes, node allocation was not working. (#805)

if activeNodes contains the list of ip's of active nodes and the block info contains the the hostname vs  no of block, then node allocation was not working.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cc9d949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cc9d949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cc9d949

Branch: refs/heads/master
Commit: 2cc9d9493cc9730b0dec6ea9c987299a94d35da6
Parents: 18ef176
Author: Mohammad Shahid Khan <mo...@gmail.com>
Authored: Fri Jul 15 02:30:45 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 15 02:30:45 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java  | 19 ++++++++++++++++++-
 .../carbondata/spark/rdd/CarbonQueryRDD.scala    |  4 +++-
 2 files changed, 21 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cc9d949/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
index f81a006..a9378d2 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1233,10 +1233,20 @@ public final class CarbonLoaderUtil {
     // are assigned first
     Collections.sort(multiBlockRelations);
 
+    Set<String> validActiveNodes = new HashSet<String>();
+    // find all the valid active nodes
     for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
       String nodeName = nodeMultiBlockRelation.getNode();
       //assign the block to the node only if the node is active
-      if (null != activeNodes && !isActiveExecutor(activeNodes, nodeName)) {
+      if (null != activeNodes && isActiveExecutor(activeNodes, nodeName)) {
+        validActiveNodes.add(nodeName);
+      }
+    }
+
+    for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
+      String nodeName = nodeMultiBlockRelation.getNode();
+      //assign the block to the node only if the node is active
+      if (!validActiveNodes.isEmpty() && !validActiveNodes.contains(nodeName)) {
         continue;
       }
       // this loop will be for each NODE
@@ -1287,6 +1297,13 @@ public final class CarbonLoaderUtil {
       } catch (UnknownHostException ue) {
         isActiveNode = false;
       }
+    } else {
+      try {
+        String hostAddress = InetAddress.getLocalHost().getHostAddress();
+        isActiveNode = activeNode.contains(hostAddress);
+      } catch (UnknownHostException ue) {
+        isActiveNode = false;
+      }
     }
     return isActiveNode;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cc9d949/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 0f81bbd..9d81f0f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -119,13 +119,15 @@ class CarbonQueryRDD[V: ClassTag](
             confExecutors = sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
           }
         }
+        val startTime = System.currentTimeMillis
         val activeNodes = DistributionUtil
           .ensureExecutorsAndGetNodeList(requiredExecutors, confExecutors, sparkContext)
         val nodeBlockMapping =
           CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism,
             activeNodes.toList.asJava
           )
-
+        val timeElapsed: Long = System.currentTimeMillis - startTime
+        LOGGER.info("Total Time taken in block allocation : " + timeElapsed)
         var i = 0
         // Create Spark Partition for each task and assign blocks
         nodeBlockMapping.asScala.foreach { entry =>