You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:35 UTC
[37/47] incubator-carbondata git commit: [CARBONDATA-122] Block
distribution give second and third preferred locaiton (#887)
[CARBONDATA-122] Block distribution give second and third preferred locaiton (#887)
Give second and third option for node distribution
Support second and third locations option for dataload
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/066f74b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/066f74b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/066f74b6
Branch: refs/heads/master
Commit: 066f74b649257b02e0e0b07c34ddc61661f267b4
Parents: 5341c7d
Author: Venkata Ramana G <g....@gmail.com>
Authored: Fri Jul 29 07:57:02 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Fri Jul 29 07:57:02 2016 +0530
----------------------------------------------------------------------
.../org/carbondata/core/load/BlockDetails.java | 8 ++++-
.../hadoop/test/util/StoreCreator.java | 2 +-
.../org/apache/spark/util/SplitUtils.scala | 6 +++-
.../spark/rdd/CarbonDataLoadRDD.scala | 29 +++++++++++++++--
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../carbondata/spark/rdd/CarbonQueryRDD.scala | 33 +++++++++++++++++---
6 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/core/src/main/java/org/carbondata/core/load/BlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/BlockDetails.java b/core/src/main/java/org/carbondata/core/load/BlockDetails.java
index 9f0e13b..39c39a0 100644
--- a/core/src/main/java/org/carbondata/core/load/BlockDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/BlockDetails.java
@@ -38,11 +38,14 @@ public class BlockDetails implements Serializable {
private long blockLength;
//file path which block belong to
private String filePath;
+ // locations where this block exists
+ private String[] locations;
- public BlockDetails(String filePath, long blockOffset, long blockLength) {
+ public BlockDetails(String filePath, long blockOffset, long blockLength, String[] locations) {
this.filePath = filePath;
this.blockOffset = blockOffset;
this.blockLength = blockLength;
+ this.locations = locations;
}
public long getBlockOffset() {
@@ -69,4 +72,7 @@ public class BlockDetails implements Serializable {
this.filePath = filePath;
}
+ public String[] getLocations() {
+ return locations;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
index c419c27..15b71ab 100644
--- a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
@@ -365,7 +365,7 @@ public class StoreCreator {
schmaModel.setCsvFilePath(loadModel.getFactFilePath());
SchemaInfo info = new SchemaInfo();
BlockDetails blockDetails = new BlockDetails(loadModel.getFactFilePath(),
- 0, new File(loadModel.getFactFilePath()).length());
+ 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
GraphGenerator.blockInfo.put("qwqwq", new BlockDetails[] { blockDetails });
schmaModel.setBlocksID("qwqwq");
schmaModel.setEscapeCharacter("\\");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
index c126f0e..296ce91 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
@@ -56,7 +56,11 @@ object SplitUtils {
part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
}
splits.map { block =>
- new BlockDetails(block.getPath.toString, block.getStart, block.getLength)
+ new BlockDetails(block.getPath.toString,
+ block.getStart,
+ block.getLength,
+ block.getLocations
+ )
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 36a1c0f..2a19da9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -19,6 +19,7 @@
package org.carbondata.spark.rdd
import java.lang.Long
+import java.util
import java.util.UUID
import scala.collection.JavaConverters._
@@ -452,9 +453,31 @@ class CarbonDataLoadRDD[K, V](
case false =>
// for node partition
val theSplit = split.asInstanceOf[CarbonNodePartition]
- val location: Seq[String] = List(theSplit.serializableHadoopSplit)
- logInfo("Prefered Location for split : " + location(0))
- location
+ val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+ logInfo("Preferred Location for split : " + firstOptionLocation(0))
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+ val tableBlocks = theSplit.blocksDetails
+ tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
+ location => {
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
+ }
+ }
+ }
+ )
+ )
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+ )
+
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 54cf1bd..ab77ea9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -768,7 +768,7 @@ object CarbonDataRDDFactory extends Logging {
entry._2.asScala.map(distributable => {
val tableBlock = distributable.asInstanceOf[TableBlockInfo]
new BlockDetails(tableBlock.getFilePath,
- tableBlock.getBlockOffset, tableBlock.getBlockLength
+ tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
)
}).toArray
(entry._1, blockDetailsList)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 50d3ca7..9eb680c 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -277,8 +277,33 @@ class CarbonQueryRDD[V: ClassTag](
/**
* Get the preferred locations where to launch this task.
*/
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- val theSplit = partition.asInstanceOf[CarbonSparkPartition]
- theSplit.locations.filter(_ != "localhost")
- }
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ val theSplit = partition.asInstanceOf[CarbonSparkPartition]
+ val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
+ val tableBlocks = theSplit.tableBlockInfos
+ // node name and count mapping
+ val blockMap = new util.LinkedHashMap[String, Integer]()
+
+ tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
+ location => {
+ if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+ val currentCount = blockMap.get(location)
+ if (currentCount == null) {
+ blockMap.put(location, 1)
+ } else {
+ blockMap.put(location, currentCount + 1)
+ }
+ }
+ }
+ )
+ )
+
+ val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+ nodeCount1.getValue > nodeCount2.getValue
+ }
+ )
+
+ val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+ firstOptionLocation ++ sortedNodesList
+ }
}