You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/08/22 13:35:31 UTC
[1/2] incubator-carbondata git commit: CARBONDATA-153 Record count is
not matching while loading the data when one data node went down in HA setup
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 04c45b1ee -> 62c0b05e6
CARBONDATA-153 Record count is not matching while loading the data when one data node went down in HA setup
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5e1235ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5e1235ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5e1235ae
Branch: refs/heads/master
Commit: 5e1235ae317ea1915f3cba24f57fad6634754af3
Parents: 04c45b1
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Aug 9 10:47:02 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Aug 22 18:04:44 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 55 ++++++++++++++------
1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/5e1235ae/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index f663b06..b934b1d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1118,7 +1118,7 @@ public final class CarbonLoaderUtil {
createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
// if any blocks remain then assign them to nodes in round robin.
- assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode);
+ assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
return nodeBlocksMap;
}
@@ -1200,23 +1200,23 @@ public final class CarbonLoaderUtil {
* @param uniqueBlocks
*/
private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
- Set<Distributable> uniqueBlocks, int noOfBlocksPerNode) {
-
- for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
- Iterator<Distributable> blocks = uniqueBlocks.iterator();
- List<Distributable> blockLst = entry.getValue();
- //if the node is already having the per block nodes then avoid assign the extra blocks
- if (blockLst.size() == noOfBlocksPerNode) {
- continue;
- }
- while (blocks.hasNext()) {
- Distributable block = blocks.next();
- blockLst.add(block);
- blocks.remove();
- if (blockLst.size() >= noOfBlocksPerNode) {
- break;
+ Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes) {
+
+ if (activeNodes != null) {
+ for (String activeNode : activeNodes) {
+ List<Distributable> blockLst = outputMap.get(activeNode);
+ if (null == blockLst) {
+ blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ outputMap.put(activeNode, blockLst);
}
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ }
+ } else {
+ for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
+ List<Distributable> blockLst = entry.getValue();
+ populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
}
+
}
for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
@@ -1231,6 +1231,29 @@ public final class CarbonLoaderUtil {
}
/**
+ * The method populate the blockLst to be allocate to a specific node.
+ * @param uniqueBlocks
+ * @param noOfBlocksPerNode
+ * @param blockLst
+ */
+ private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+ List<Distributable> blockLst) {
+ Iterator<Distributable> blocks = uniqueBlocks.iterator();
+ //if the node is already having the per block nodes then avoid assign the extra blocks
+ if (blockLst.size() == noOfBlocksPerNode) {
+ return;
+ }
+ while (blocks.hasNext()) {
+ Distributable block = blocks.next();
+ blockLst.add(block);
+ blocks.remove();
+ if (blockLst.size() >= noOfBlocksPerNode) {
+ break;
+ }
+ }
+ }
+
+ /**
* To create the final output of the Node and Data blocks
*
* @param outputMap
[2/2] incubator-carbondata git commit: [CARBONDATA-153] This closes
#77
Posted by gv...@apache.org.
[CARBONDATA-153] This closes #77
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/62c0b05e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/62c0b05e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/62c0b05e
Branch: refs/heads/master
Commit: 62c0b05e62e3c2cadc03e6355bd587d14eab355c
Parents: 04c45b1 5e1235a
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Mon Aug 22 18:59:06 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Aug 22 18:59:06 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 55 ++++++++++++++------
1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------