You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/13 07:22:20 UTC

[carbondata] branch master updated: [CARBONDATA-3982] Use Partition instead of Span to split legacy and non-legacy segments for executor distribution in indexserver

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0527b76  [CARBONDATA-3982] Use Partition instead of Span to split legacy and non-legacy segments for executor distribution in indexserver
0527b76 is described below

commit 0527b7603e97a926e81e1dcf55384d9453c6898e
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Thu Sep 10 15:01:12 2020 +0530

    [CARBONDATA-3982] Use Partition instead of Span to split legacy and
    non-legacy segments for executor distribution in indexserver
    
    Why is this PR needed?
    When we use segments.span to split legacy segments and non-legacy
    segments where legacy segment index size is zero, segments list is empty.
    
    What changes were proposed in this PR?
    Use segments.partition instead of span to split legacy segments and non-legacy segments
    
    This closes #3918
---
 .../indexserver/DistributedRDDUtils.scala          |  7 +++--
 .../indexserver/DistributedRDDUtilsTest.scala      | 34 +++++++++++++++++++++-
 2 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 149a30c..c3a84fa 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -50,7 +50,7 @@ object DistributedRDDUtils {
   def getExecutors(segment: Array[InputSplit], executorsList : Map[String, Seq[String]],
       tableUniqueName: String, rddId: Int): Seq[Partition] = {
     // sort the partitions in increasing order of index size.
-    val (segments, legacySegments) = segment.span(split => split
+    val (segments, legacySegments) = segment.partition(split => split
       .asInstanceOf[IndexInputSplitWrapper].getDistributable.getSegment.getIndexSize > 0)
     val sortedPartitions = segments.sortWith(_.asInstanceOf[IndexInputSplitWrapper]
                                               .getDistributable.getSegment.getIndexSize >
@@ -94,7 +94,7 @@ object DistributedRDDUtils {
   private def convertToPartition(segments: Seq[InputSplit], legacySegments: Seq[InputSplit],
       tableUniqueName: String,
       executorList: Map[String, Seq[String]]): Seq[InputSplit] = {
-    if (legacySegments.nonEmpty) {
+    val legacySegmentsInputSplit = if (legacySegments.nonEmpty) {
       val validExecutorIds = executorList.flatMap {
         case (host, executors) => executors.map {
           executor => s"${host}_$executor"
@@ -108,7 +108,8 @@ object DistributedRDDUtils {
           wrapper.setLocations(Array("executor_" + executor))
           legacySegment
       }
-    } else { Seq() } ++ segments.map { partition =>
+    } else { Seq() }
+    legacySegmentsInputSplit ++ segments.map { partition =>
       val wrapper: IndexInputSplit = partition.asInstanceOf[IndexInputSplitWrapper]
         .getDistributable
       wrapper.setLocations(Array(DistributedRDDUtils
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index b36f7ea..4cdc933 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -30,7 +30,7 @@ import org.scalatest.{BeforeAndAfterEach, FunSuite}
 import org.apache.carbondata.core.index.{IndexInputFormat, Segment}
 import org.apache.carbondata.core.index.dev.expr.IndexInputSplitWrapper
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexInputSplit
-import org.apache.carbondata.indexserver.{DistributedIndexJob, DistributedRDDUtils}
+import org.apache.carbondata.indexserver.{DistributedIndexJob, DistributedRDDUtils, IndexRDDPartition}
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 
 class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
@@ -114,6 +114,38 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
     }
   }
 
+  test("Test distribution for legacy segments and non-legacy segments") {
+    val executorList = (0 until 10).map {
+      host =>
+        val executorIds = (0 until 2).map {
+          executor => executor.toString
+        }
+        (host.toString, executorIds)
+    }.toMap
+    val indexDistributableWrapper = (0 to 10).map {
+      i =>
+        val segment = new Segment(i.toString)
+        if (i < 5) { segment.setIndexSize(0) } else {
+          segment.setIndexSize(1)
+        }
+        val blockletIndexDistributable = new BlockletIndexInputSplit(i.toString)
+        blockletIndexDistributable.setSegment(segment)
+        new IndexInputSplitWrapper("", blockletIndexDistributable)
+    }
+
+    val partitions = DistributedRDDUtils
+      .getExecutors(indexDistributableWrapper.toArray, executorList, "default_table1", 1)
+    var size = 0
+    partitions.zipWithIndex.foreach {
+      case (partition: IndexRDDPartition, _) =>
+        size += partition.inputSplit.size
+    }
+    assert(size == 11)
+    DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
+      a => a._2.values().asScala.foreach(size => assert(size == 1 || size == 1))
+    }
+  }
+
   test("Test distribution for non legacy segments") {
     val executorList = (0 until 10).map {
       host =>