You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:35 UTC

[37/47] incubator-carbondata git commit: [CARBONDATA-122] Block distribution give second and third preferred locaiton (#887)

[CARBONDATA-122] Block distribution give second and third preferred locaiton (#887)

  Give second and third option for node distribution
 Support second and third locations option for dataload


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/066f74b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/066f74b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/066f74b6

Branch: refs/heads/master
Commit: 066f74b649257b02e0e0b07c34ddc61661f267b4
Parents: 5341c7d
Author: Venkata Ramana G <g....@gmail.com>
Authored: Fri Jul 29 07:57:02 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Fri Jul 29 07:57:02 2016 +0530

----------------------------------------------------------------------
 .../org/carbondata/core/load/BlockDetails.java  |  8 ++++-
 .../hadoop/test/util/StoreCreator.java          |  2 +-
 .../org/apache/spark/util/SplitUtils.scala      |  6 +++-
 .../spark/rdd/CarbonDataLoadRDD.scala           | 29 +++++++++++++++--
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../carbondata/spark/rdd/CarbonQueryRDD.scala   | 33 +++++++++++++++++---
 6 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/core/src/main/java/org/carbondata/core/load/BlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/BlockDetails.java b/core/src/main/java/org/carbondata/core/load/BlockDetails.java
index 9f0e13b..39c39a0 100644
--- a/core/src/main/java/org/carbondata/core/load/BlockDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/BlockDetails.java
@@ -38,11 +38,14 @@ public class BlockDetails implements Serializable {
   private long blockLength;
   //file path which block belong to
   private String filePath;
+  // locations where this block exists
+  private String[] locations;
 
-  public BlockDetails(String filePath, long blockOffset, long blockLength) {
+  public BlockDetails(String filePath, long blockOffset, long blockLength, String[] locations) {
     this.filePath = filePath;
     this.blockOffset = blockOffset;
     this.blockLength = blockLength;
+    this.locations = locations;
   }
 
   public long getBlockOffset() {
@@ -69,4 +72,7 @@ public class BlockDetails implements Serializable {
     this.filePath = filePath;
   }
 
+  public String[] getLocations() {
+    return locations;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
index c419c27..15b71ab 100644
--- a/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/carbondata/hadoop/test/util/StoreCreator.java
@@ -365,7 +365,7 @@ public class StoreCreator {
     schmaModel.setCsvFilePath(loadModel.getFactFilePath());
     SchemaInfo info = new SchemaInfo();
     BlockDetails blockDetails = new BlockDetails(loadModel.getFactFilePath(),
-        0, new File(loadModel.getFactFilePath()).length());
+        0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
     GraphGenerator.blockInfo.put("qwqwq", new BlockDetails[] { blockDetails });
     schmaModel.setBlocksID("qwqwq");
     schmaModel.setEscapeCharacter("\\");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
index c126f0e..296ce91 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
@@ -56,7 +56,11 @@ object SplitUtils {
         part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
       }
       splits.map { block =>
-        new BlockDetails(block.getPath.toString, block.getStart, block.getLength)
+        new BlockDetails(block.getPath.toString,
+          block.getStart,
+          block.getLength,
+          block.getLocations
+        )
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 36a1c0f..2a19da9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -19,6 +19,7 @@
 package org.carbondata.spark.rdd
 
 import java.lang.Long
+import java.util
 import java.util.UUID
 
 import scala.collection.JavaConverters._
@@ -452,9 +453,31 @@ class CarbonDataLoadRDD[K, V](
       case false =>
         // for node partition
         val theSplit = split.asInstanceOf[CarbonNodePartition]
-        val location: Seq[String] = List(theSplit.serializableHadoopSplit)
-        logInfo("Prefered Location for split : " + location(0))
-        location
+        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+        logInfo("Preferred Location for split : " + firstOptionLocation(0))
+        val blockMap = new util.LinkedHashMap[String, Integer]()
+        val tableBlocks = theSplit.blocksDetails
+        tableBlocks.foreach(tableBlock => tableBlock.getLocations.foreach(
+          location => {
+            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+              val currentCount = blockMap.get(location)
+              if (currentCount == null) {
+                blockMap.put(location, 1)
+              } else {
+                blockMap.put(location, currentCount + 1)
+              }
+            }
+          }
+        )
+        )
+
+        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+          nodeCount1.getValue > nodeCount2.getValue
+        }
+        )
+
+        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+        firstOptionLocation ++ sortedNodesList
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 54cf1bd..ab77ea9 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -768,7 +768,7 @@ object CarbonDataRDDFactory extends Logging {
               entry._2.asScala.map(distributable => {
                 val tableBlock = distributable.asInstanceOf[TableBlockInfo]
                 new BlockDetails(tableBlock.getFilePath,
-                  tableBlock.getBlockOffset, tableBlock.getBlockLength
+                  tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
                 )
               }).toArray
             (entry._1, blockDetailsList)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/066f74b6/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
index 50d3ca7..9eb680c 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
@@ -277,8 +277,33 @@ class CarbonQueryRDD[V: ClassTag](
    /**
     * Get the preferred locations where to launch this task.
     */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-    theSplit.locations.filter(_ != "localhost")
-  }
+   override def getPreferredLocations(partition: Partition): Seq[String] = {
+     val theSplit = partition.asInstanceOf[CarbonSparkPartition]
+     val firstOptionLocation = theSplit.locations.filter(_ != "localhost")
+     val tableBlocks = theSplit.tableBlockInfos
+     // node name and count mapping
+     val blockMap = new util.LinkedHashMap[String, Integer]()
+
+     tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach(
+       location => {
+         if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+           val currentCount = blockMap.get(location)
+           if (currentCount == null) {
+             blockMap.put(location, 1)
+           } else {
+             blockMap.put(location, currentCount + 1)
+           }
+         }
+       }
+     )
+     )
+
+     val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
+       nodeCount1.getValue > nodeCount2.getValue
+     }
+     )
+
+     val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+     firstOptionLocation ++ sortedNodesList
+   }
 }