You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2016/08/23 19:29:10 UTC
[1/2] incubator-carbondata git commit: [CARBONDATA-171] Block
distribution not proper when the number of active executors more than the
node size
Repository: incubator-carbondata
Updated Branches:
refs/heads/master c11058d74 -> 1a28ada21
[CARBONDATA-171] Block distribution not proper when the number of active executors more than the node size
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/eac5573a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/eac5573a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/eac5573a
Branch: refs/heads/master
Commit: eac5573a644118c4942715f15e629ffa9ca1141b
Parents: c11058d
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Tue Aug 23 20:47:28 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Aug 24 00:39:36 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 40 ++--
.../spark/sql/hive/DistributionUtil.scala | 6 +-
.../spark/load/CarbonLoaderUtilTest.java | 191 +++++++++++++++++++
3 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eac5573a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 918702d..b7fb4a7 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1225,9 +1225,11 @@ public final class CarbonLoaderUtil {
List<Distributable> blockLst = outputMap.get(activeNode);
if (null == blockLst) {
blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- outputMap.put(activeNode, blockLst);
}
populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+ if (blockLst.size() > 0) {
+ outputMap.put(activeNode, blockLst);
+ }
}
} else {
for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
@@ -1293,21 +1295,15 @@ public final class CarbonLoaderUtil {
// are assigned first
Collections.sort(multiBlockRelations);
- Set<String> validActiveNodes = new HashSet<String>();
- // find all the valid active nodes
- for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
- String nodeName = nodeMultiBlockRelation.getNode();
- //assign the block to the node only if the node is active
- if (null != activeNodes && isActiveExecutor(activeNodes, nodeName)) {
- validActiveNodes.add(nodeName);
- }
- }
-
for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
String nodeName = nodeMultiBlockRelation.getNode();
//assign the block to the node only if the node is active
- if (!validActiveNodes.isEmpty() && !validActiveNodes.contains(nodeName)) {
- continue;
+ String activeExecutor = nodeName;
+ if (null != activeNodes) {
+ activeExecutor = getActiveExecutor(activeNodes, nodeName);
+ if (null == activeExecutor) {
+ continue;
+ }
}
// this loop will be for each NODE
int nodeCapacity = 0;
@@ -1317,14 +1313,14 @@ public final class CarbonLoaderUtil {
// check if this is already assigned.
if (uniqueBlocks.contains(block)) {
- if (null == outputMap.get(nodeName)) {
+ if (null == outputMap.get(activeExecutor)) {
List<Distributable> list =
new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- outputMap.put(nodeName, list);
+ outputMap.put(activeExecutor, list);
}
// assign this block to this node if node has capacity left
if (nodeCapacity < blocksPerNode) {
- List<Distributable> infos = outputMap.get(nodeName);
+ List<Distributable> infos = outputMap.get(activeExecutor);
infos.add(block);
nodeCapacity++;
uniqueBlocks.remove(block);
@@ -1344,16 +1340,19 @@ public final class CarbonLoaderUtil {
* @param nodeName
* @return returns true if active else false.
*/
- private static boolean isActiveExecutor(List activeNode, String nodeName) {
+ private static String getActiveExecutor(List activeNode, String nodeName) {
boolean isActiveNode = activeNode.contains(nodeName);
if (isActiveNode) {
- return isActiveNode;
+ return nodeName;
}
//if localhost then retrieve the localhost name then do the check
else if (nodeName.equals("localhost")) {
try {
String hostName = InetAddress.getLocalHost().getHostName();
isActiveNode = activeNode.contains(hostName);
+ if(isActiveNode){
+ return hostName;
+ }
} catch (UnknownHostException ue) {
isActiveNode = false;
}
@@ -1361,11 +1360,14 @@ public final class CarbonLoaderUtil {
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
isActiveNode = activeNode.contains(hostAddress);
+ if(isActiveNode){
+ return hostAddress;
+ }
} catch (UnknownHostException ue) {
isActiveNode = false;
}
}
- return isActiveNode;
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eac5573a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index ac32aaf..81abbfb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -130,7 +130,7 @@ object DistributionUtil {
confExecutors
} else {nodeMapping.size()}
- var startTime = System.currentTimeMillis();
+ val startTime = System.currentTimeMillis();
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30;
@@ -139,9 +139,9 @@ object DistributionUtil {
nodes = DistributionUtil.getNodeList(sparkContext)
maxTimes = maxTimes - 1;
}
- var timDiff = System.currentTimeMillis() - startTime;
+ val timDiff = System.currentTimeMillis() - startTime;
LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) * 500)
- nodes
+ nodes.distinct
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/eac5573a/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java b/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
new file mode 100644
index 0000000..f8ec400
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/load/CarbonLoaderUtilTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.load;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to test block distribution functionality
+ */
+public class CarbonLoaderUtilTest {
+ List<Distributable> blockInfos = null;
+ int noOfNodesInput = -1;
+ List<String> activeNode = null;
+ Map<String, List<Distributable>> expected = null;
+ Map<String, List<Distributable>> mapOfNodes = null;
+
+ @Test public void nodeBlockMapping() throws Exception {
+
+ // scenario when the 3 nodes and 3 executors
+ initSet1();
+ Map<String, List<Distributable>> mapOfNodes =
+ CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+ // node allocation
+ Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+ // block allocation
+ boolean isEqual = compareResult(expected, mapOfNodes);
+ Assert.assertTrue("Block Allocation", isEqual);
+
+ // 2 node and 3 executors
+ initSet2();
+ mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+ // node allocation
+ Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+ // block allocation
+ isEqual = compareResult(expected, mapOfNodes);
+ Assert.assertTrue("Block Allocation", isEqual);
+
+ // 3 data node and 2 executors
+ initSet3();
+ mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
+ // node allocation
+ Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
+ // block allocation
+ isEqual = compareResult(expected, mapOfNodes);
+ Assert.assertTrue("Block Allocation", isEqual);
+ }
+
+ /**
+ * compares the blocks allocation
+ *
+ * @param expectedResult
+ * @param actualResult
+ * @return
+ */
+ private boolean compareResult(Map<String, List<Distributable>> expectedResult,
+ Map<String, List<Distributable>> actualResult) {
+ expectedResult = sortByListSize(expectedResult);
+ actualResult = sortByListSize(actualResult);
+ List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
+ List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
+ boolean isEqual = expectedList.size() == mapOfNodesList.size();
+ if (isEqual) {
+ for (int i = 0; i < expectedList.size(); i++) {
+ int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
+ int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
+ isEqual = size1 == size2;
+ if (!isEqual) {
+ break;
+ }
+ }
+ }
+ return isEqual;
+ }
+
+ /**
+ * sort by list size
+ *
+ * @param map
+ * @return
+ */
+ private static Map<String, List<Distributable>> sortByListSize(
+ Map<String, List<Distributable>> map) {
+ List<List<Distributable>> list = new LinkedList(map.entrySet());
+ Collections.sort(list, new Comparator() {
+ public int compare(Object obj1, Object obj2) {
+ if (obj1 == null && obj2 == null) {
+ return 0;
+ } else if (obj1 == null) {
+ return 1;
+ } else if (obj2 == null) {
+ return -1;
+ }
+ int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
+ int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
+ return size2 - size1;
+ }
+ });
+
+ Map res = new LinkedHashMap();
+ for (Iterator it = list.iterator(); it.hasNext(); ) {
+ Map.Entry entry = (Map.Entry) it.next();
+ res.put(entry.getKey(), entry.getValue());
+ }
+ return res;
+ }
+
+ void initSet1() {
+ blockInfos = new ArrayList<>();
+ activeNode = new ArrayList<>();
+ activeNode.add("node-7");
+ activeNode.add("node-9");
+ activeNode.add("node-11");
+ String[] location = { "node-7", "node-9", "node-11" };
+ blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+ expected = new HashMap<>();
+ expected.put("node-7", blockInfos.subList(0, 2));
+ expected.put("node-9", blockInfos.subList(2, 4));
+ expected.put("node-11", blockInfos.subList(4, 6));
+ }
+
+ void initSet2() {
+ blockInfos = new ArrayList<>();
+ activeNode = new ArrayList<>();
+ activeNode.add("node-7");
+ activeNode.add("node-9");
+ activeNode.add("node-11");
+ String[] location = { "node-7", "node-11" };
+ blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+ expected = new HashMap<>();
+ expected.put("node-7", blockInfos.subList(0, 2));
+ expected.put("node-9", blockInfos.subList(2, 4));
+ expected.put("node-11", blockInfos.subList(4, 6));
+ }
+
+ void initSet3() {
+ blockInfos = new ArrayList<>();
+ activeNode = new ArrayList<>();
+ activeNode.add("node-7");
+ activeNode.add("node-11");
+ String[] location = { "node-7", "node-9", "node-11" };
+ blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
+ blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
+ expected = new HashMap<>();
+ expected.put("node-7", blockInfos.subList(0, 3));
+ expected.put("node-11", blockInfos.subList(3, 6));
+ }
+}
\ No newline at end of file
[2/2] incubator-carbondata git commit: [CARBONDATA-171] This closes
#87
Posted by gv...@apache.org.
[CARBONDATA-171] This closes #87
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/1a28ada2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/1a28ada2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/1a28ada2
Branch: refs/heads/master
Commit: 1a28ada21af0f0ff975c93252fdbec959974e542
Parents: c11058d eac5573
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Wed Aug 24 00:58:45 2016 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Wed Aug 24 00:58:45 2016 +0530
----------------------------------------------------------------------
.../carbondata/spark/load/CarbonLoaderUtil.java | 40 ++--
.../spark/sql/hive/DistributionUtil.scala | 6 +-
.../spark/load/CarbonLoaderUtilTest.java | 191 +++++++++++++++++++
3 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------