You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/08/22 13:35:31 UTC

[1/2] incubator-carbondata git commit: CARBONDATA-153 Record count is not matching while loading the data when one data node went down in HA setup

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 04c45b1ee -> 62c0b05e6


CARBONDATA-153 Record count is not matching while loading the data when one data node went down in HA setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5e1235ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5e1235ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5e1235ae

Branch: refs/heads/master
Commit: 5e1235ae317ea1915f3cba24f57fad6634754af3
Parents: 04c45b1
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Aug 9 10:47:02 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Aug 22 18:04:44 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 55 ++++++++++++++------
 1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5e1235ae/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index f663b06..b934b1d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1118,7 +1118,7 @@ public final class CarbonLoaderUtil {
     createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
 
     // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode);
+    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
 
     return nodeBlocksMap;
   }
@@ -1200,23 +1200,23 @@ public final class CarbonLoaderUtil {
    * @param uniqueBlocks
    */
   private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode) {
-
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      List<Distributable> blockLst = entry.getValue();
-      //if the node is already having the per block nodes then avoid assign the extra blocks
-      if (blockLst.size() == noOfBlocksPerNode) {
-        continue;
-      }
-      while (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        blockLst.add(block);
-        blocks.remove();
-        if (blockLst.size() >= noOfBlocksPerNode) {
-          break;
+      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+    if (activeNodes != null) {
+      for (String activeNode : activeNodes) {
+        List<Distributable> blockLst = outputMap.get(activeNode);
+        if (null == blockLst) {
+          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          outputMap.put(activeNode, blockLst);
         }
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      }
+    } else {
+      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+        List<Distributable> blockLst = entry.getValue();
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
       }
+
     }
 
     for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
@@ -1231,6 +1231,29 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * The method populate the blockLst to be allocate to a specific node.
+   * @param uniqueBlocks
+   * @param noOfBlocksPerNode
+   * @param blockLst
+   */
+  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+      List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = uniqueBlocks.iterator();
+    //if the node is already having the per block nodes then avoid assign the extra blocks
+    if (blockLst.size() == noOfBlocksPerNode) {
+      return;
+    }
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      blockLst.add(block);
+      blocks.remove();
+      if (blockLst.size() >= noOfBlocksPerNode) {
+        break;
+      }
+    }
+  }
+
+  /**
    * To create the final output of the Node and Data blocks
    *
    * @param outputMap


[2/2] incubator-carbondata git commit: [CARBONDATA-153] This closes #77

Posted by gv...@apache.org.
[CARBONDATA-153] This closes #77


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/62c0b05e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/62c0b05e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/62c0b05e

Branch: refs/heads/master
Commit: 62c0b05e62e3c2cadc03e6355bd587d14eab355c
Parents: 04c45b1 5e1235a
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Mon Aug 22 18:59:06 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Aug 22 18:59:06 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 55 ++++++++++++++------
 1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------