You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/31 07:01:50 UTC

[carbondata] branch master updated: [CARBONDATA-3399] Implement executor id based distribution for indexserver

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new fa3e392  [CARBONDATA-3399] Implement executor id based distribution for indexserver
fa3e392 is described below

commit fa3e392c17ff1867baa6ac1ae918346e76ac1add
Author: kunal642 <ku...@gmail.com>
AuthorDate: Mon May 27 12:41:54 2019 +0530

    [CARBONDATA-3399] Implement executor id based distribution for indexserver
    
    This closes #3237
---
 .../apache/spark/sql/hive/DistributionUtil.scala   |   8 +
 .../indexserver/DistributedPruneRDD.scala          |   9 +-
 .../indexserver/DistributedRDDUtils.scala          | 218 ++++++++++++++++-----
 .../indexserver/DistributedRDDUtilsTest.scala      | 115 +++++++++++
 4 files changed, 300 insertions(+), 50 deletions(-)

diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0861d2b..4256777 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -89,6 +89,14 @@ object DistributionUtil {
     }
   }
 
+  def getExecutors(sparkContext: SparkContext): Map[String, Seq[String]] = {
+    val bm = sparkContext.env.blockManager
+    bm.master.getPeers(bm.blockManagerId)
+      .groupBy(blockManagerId => blockManagerId.host).map {
+      case (host, blockManagerIds) => (host, blockManagerIds.map(_.executorId))
+    }
+  }
+
   private def getLocalhostIPs = {
     val iface = NetworkInterface.getNetworkInterfaces
     var addresses: List[InterfaceAddress] = List.empty
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index d2dab2d..607f923 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
-private[indexserver] class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
   extends Partition {
 
   override def index: Int = idx
@@ -50,8 +50,6 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
     dataMapFormat: DistributableDataMapFormat)
   extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
 
-  val executorsList: Set[String] = DistributionUtil.getNodeList(ss.sparkContext).toSet
-
   @transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
     .getName)
 
@@ -106,7 +104,8 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
           throw new java.util.NoSuchElementException("End of stream")
         }
         havePair = false
-        val executorIP = SparkEnv.get.blockManager.blockManagerId.host
+        val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+          SparkEnv.get.blockManager.blockManagerId.executorId}"
         val value = (executorIP + "_" + cacheSize.toString, reader.getCurrentValue)
         value
       }
@@ -125,6 +124,8 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
         f => new DataMapRDDPartition(id, f._2, f._1)
       }.toArray
     } else {
+      val executorsList: Map[String, Seq[String]] = DistributionUtil
+        .getExecutors(ss.sparkContext)
       val (response, time) = CarbonScalaUtil.logTime {
         DistributedRDDUtils.getExecutors(splits.toArray, executorsList, dataMapFormat
           .getCarbonTable.getTableUniqueName, id)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index c381f80..c7632be 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.lang.StringUtils
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.spark.Partition
 
@@ -29,14 +28,14 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
 
 object DistributedRDDUtils {
   // Segment number to executorNode mapping
-  val segmentToExecutorMapping: java.util.Map[String, String] =
-    new ConcurrentHashMap[String, String]()
+  val tableToExecutorMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] =
+    new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]()
 
   // executorNode to segmentSize mapping
-  val executorToCacheSizeMapping: java.util.Map[String, Long] =
-    new ConcurrentHashMap[String, Long]()
+  val executorToCacheSizeMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] =
+    new ConcurrentHashMap[String, ConcurrentHashMap[String, Long]]()
 
-  def getExecutors(segment: Array[InputSplit], executorsList : Set[String],
+  def getExecutors(segment: Array[InputSplit], executorsList : Map[String, Seq[String]],
       tableUniqueName: String, rddId: Int): Seq[Partition] = {
     // sort the partitions in increasing order of index size.
     val (segments, legacySegments) = segment.span(split => split
@@ -47,10 +46,23 @@ object DistributedRDDUtils {
                                               .getDistributable.getSegment.getIndexSize)
     val executorCache = DistributedRDDUtils.executorToCacheSizeMapping
     // check if any executor is dead.
-    val invalidExecutors = executorCache.keySet().asScala.diff(executorsList)
-    if (invalidExecutors.nonEmpty) {
+    val invalidHosts = executorCache.keySet().asScala.diff(executorsList.keySet)
+    if (invalidHosts.nonEmpty) {
       // extract the dead executor host name
-      DistributedRDDUtils.invalidateExecutors(invalidExecutors.toSeq)
+      DistributedRDDUtils.invalidateHosts(invalidHosts.toSeq)
+    }
+    val invalidExecutorIds = executorsList.collect {
+      case (host, executors) if executorCache.get(host) != null =>
+        val toBeRemovedExecutors = executorCache.get(host).keySet().asScala.diff(executors.toSet)
+        if (executors.size == toBeRemovedExecutors.size) {
+          DistributedRDDUtils.invalidateHosts(Seq(host))
+          Seq()
+        } else {
+          toBeRemovedExecutors.map(executor => host + "_" + executor)
+        }
+    }.flatten
+    if (invalidExecutorIds.nonEmpty) {
+      DistributedRDDUtils.invalidateExecutors(invalidExecutorIds.toSeq)
     }
     (convertToPartition(legacySegments, tableUniqueName, executorsList) ++
      convertToPartition(sortedPartitions, tableUniqueName, executorsList)).zipWithIndex.map {
@@ -60,7 +72,7 @@ object DistributedRDDUtils {
   }
 
   private def convertToPartition(segments: Seq[InputSplit], tableUniqueName: String,
-      executorList: Set[String]): Seq[InputSplit] = {
+      executorList: Map[String, Seq[String]]): Seq[InputSplit] = {
     segments.map { partition =>
       val wrapper: DataMapDistributable = partition.asInstanceOf[DataMapDistributableWrapper]
         .getDistributable
@@ -82,19 +94,27 @@ object DistributedRDDUtils {
         executorCacheSize =>
           // executorCacheSize would be in the form of 127.0.0.1_10024 where the left of '_'
           // would be the executor IP and the right would be the cache that executor is holding.
-          val executorIP = executorCacheSize.substring(0, executorCacheSize.lastIndexOf('_'))
+          val hostAndExecutor = executorCacheSize.substring(0,
+            executorCacheSize.lastIndexOf('_'))
+          val (host, executor) = (hostAndExecutor
+            .substring(0, hostAndExecutor.lastIndexOf('_')), hostAndExecutor
+            .substring(hostAndExecutor.lastIndexOf('_') + 1, hostAndExecutor.length))
           val size = executorCacheSize.substring(executorCacheSize.lastIndexOf('_') + 1,
             executorCacheSize.length)
-          executorToCacheSizeMapping.put(executorIP, size.toLong)
+          val executorMapping = executorToCacheSizeMapping.get(host)
+          if (executorMapping != null) {
+            executorMapping.put(executor, size.toLong)
+            executorToCacheSizeMapping.put(host, executorMapping)
+          }
       }
     }
   }
 
   def invalidateCache(tableUniqueName: String): Unit = {
-    segmentToExecutorMapping.keySet().asScala.foreach {
+    tableToExecutorMapping.keySet().asScala.foreach {
       key =>
-        if (key.split("_")(0).equalsIgnoreCase(tableUniqueName)) {
-          segmentToExecutorMapping.remove(key)
+        if (key.equalsIgnoreCase(tableUniqueName)) {
+          tableToExecutorMapping.remove(key)
         }
     }
   }
@@ -103,22 +123,77 @@ object DistributedRDDUtils {
    * Invalidate the dead executors from the mapping and assign the segments to some other
    * executor, so that the query can load the segments to the new assigned executor.
    */
-  def invalidateExecutors(invalidExecutors: Seq[String]): Unit = synchronized {
+  def invalidateHosts(invalidHosts: Seq[String]): Unit = {
+    synchronized {
+      val validInvalidExecutors: Map[String, String] = invalidHosts.flatMap {
+        host =>
+          val invalidExecutorToSizeMapping = executorToCacheSizeMapping.remove(host)
+          invalidExecutorToSizeMapping.asScala.map {
+            case (invalidExecutor, size) =>
+              getLeastLoadedExecutor match {
+                case Some((reassignedHost, reassignedExecutorId)) =>
+                  val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
+                  if (existingExecutorMapping != null) {
+                    val existingSize = existingExecutorMapping.get(reassignedExecutorId)
+                    existingExecutorMapping.put(reassignedExecutorId, existingSize + size)
+                  } else {
+                    existingExecutorMapping.put(reassignedExecutorId, size)
+                  }
+                  executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
+                  s"${host}_$invalidExecutor" -> s"${ reassignedHost }_$reassignedExecutorId"
+                case None => "" -> ""
+              }
+          }
+      }.toMap
+      updateTableMappingForInvalidExecutors(validInvalidExecutors)
+    }
+  }
+
+  private def updateTableMappingForInvalidExecutors(validInvalidExecutors: Map[String, String]) {
     // remove all invalidExecutor mapping from cache.
-    for ((key: String, value: String) <- segmentToExecutorMapping.asScala) {
+    for ((tableName: String, segmentToExecutorMapping) <- tableToExecutorMapping.asScala) {
       // find the invalid executor in cache.
-      if (invalidExecutors.contains(value)) {
-        // remove mapping for the invalid executor.
-        val invalidExecutorSize = executorToCacheSizeMapping.remove(key)
-        // find a new executor for the segment
-        val reassignedExecutor = getLeastLoadedExecutor
-        segmentToExecutorMapping.put(key, reassignedExecutor)
-        // add the size size of the invalid executor to the reassigned executor.
-        executorToCacheSizeMapping.put(
-          reassignedExecutor,
-          executorToCacheSizeMapping.get(reassignedExecutor) + invalidExecutorSize
-        )
+      val newSegmentToExecutorMapping = new ConcurrentHashMap[String, String]()
+      val existingMapping = tableToExecutorMapping.get(tableName)
+      segmentToExecutorMapping.asScala.collect {
+        case (segmentNumber, executorUniqueName) if validInvalidExecutors
+          .contains(executorUniqueName) =>
+          val newExecutorId = validInvalidExecutors(executorUniqueName)
+          // remove mapping for the invalid executor.
+          val executorIdSplits = newExecutorId.split("_")
+          val (host, executorId) = (executorIdSplits(0), executorIdSplits(1))
+          // find a new executor for the segment
+          newSegmentToExecutorMapping
+            .put(segmentNumber, s"${ host }_$executorId")
       }
+      existingMapping.putAll(newSegmentToExecutorMapping)
+      tableToExecutorMapping.put(tableName, existingMapping)
+    }
+  }
+
+  def invalidateExecutors(invalidExecutors: Seq[String]): Unit = {
+    synchronized {
+      val validInvalidExecutors: Map[String, String] = invalidExecutors.map {
+        invalidExecutor =>
+          val executorIdSplits = invalidExecutor.split("_")
+          val (host, executor) = (executorIdSplits(0), executorIdSplits(1))
+          val invalidExecutorSize = executorToCacheSizeMapping.get(host).remove(executor)
+          getLeastLoadedExecutor match {
+            case Some((reassignedHost, reassignedExecutorId)) =>
+              val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
+              if (existingExecutorMapping != null) {
+                val existingSize = existingExecutorMapping.get(reassignedExecutorId)
+                existingExecutorMapping
+                  .put(reassignedExecutorId, existingSize + invalidExecutorSize)
+              } else {
+                existingExecutorMapping.put(reassignedExecutorId, invalidExecutorSize)
+              }
+              executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
+              invalidExecutor -> s"${ reassignedHost }_$reassignedExecutorId"
+            case None => "" -> ""
+          }
+      }.toMap
+      updateTableMappingForInvalidExecutors(validInvalidExecutors)
     }
   }
 
@@ -127,8 +202,30 @@ object DistributedRDDUtils {
    *
    * @return
    */
-  private def getLeastLoadedExecutor: String = {
-    executorToCacheSizeMapping.asScala.toSeq.sortWith(_._2 < _._2).head._1
+  private def getLeastLoadedExecutor: Option[(String, String)] = {
+    val leastHostExecutor = executorToCacheSizeMapping.asScala.flatMap {
+      case (host, executorToCacheMap) =>
+        executorToCacheMap.asScala.map {
+          case (executor, size) =>
+            (host, executor, size)
+        }
+    }.toSeq.sortWith(_._3 < _._3).toList
+    leastHostExecutor match {
+      case head :: _ =>
+        Some(head._1, head._2)
+      case _ => None
+    }
+  }
+
+  private def checkForUnassignedExecutors(validExecutorIds: Seq[String]): Option[String] = {
+    val usedExecutorIds = executorToCacheSizeMapping.asScala.flatMap {
+      case (host, executorMap) =>
+        executorMap.keySet().asScala.map {
+          executor => s"${ host }_$executor"
+        }
+    }
+    val unassignedExecutor = validExecutorIds.diff(usedExecutorIds.toSeq)
+    unassignedExecutor.headOption
   }
 
   /**
@@ -137,27 +234,56 @@ object DistributedRDDUtils {
    *
    * @return
    */
-  def assignExecutor(tableName: String, segment: Segment, validExecutors: Set[String]): String = {
-    val cacheKey = s"${ tableName }_${ segment.getSegmentNo }"
-    val executor = segmentToExecutorMapping.get(cacheKey)
-    if (executor != null) {
-      executor
+  def assignExecutor(tableUniqueName: String,
+      segment: Segment,
+      validExecutors: Map[String, Seq[String]]): String = {
+    val segmentMapping = tableToExecutorMapping.get(tableUniqueName)
+    lazy val executor = segmentMapping.get(segment.getSegmentNo)
+    if (segmentMapping != null && executor != null) {
+      s"executor_$executor"
     } else {
       // check if any executor is not assigned. If yes then give priority to that executor
       // otherwise get the executor which has handled the least size.
-      val unassignedExecutors = validExecutors
-        .diff(executorToCacheSizeMapping.asScala.keys.toSet)
-      val newExecutor = if (unassignedExecutors.nonEmpty) {
-        unassignedExecutors.head.split(":")(0)
+      val validExecutorIds = validExecutors.flatMap {
+        case (host, executors) => executors.map {
+          executor => s"${host}_$executor"
+        }
+      }.toSeq
+      val unassignedExecutor = checkForUnassignedExecutors(validExecutorIds)
+      val (newHost, newExecutor) = if (unassignedExecutor.nonEmpty) {
+        val freeExecutor = unassignedExecutor.get.split("_")
+        (freeExecutor(0), freeExecutor(1))
+      } else {
+        getLeastLoadedExecutor match {
+          case Some((host, executorID)) => (host, executorID)
+          case None => throw new RuntimeException("Could not find any alive executors.")
+        }
+      }
+      val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
+      if (existingExecutorMapping != null) {
+        val existingSize = existingExecutorMapping.get(newExecutor)
+        if (existingSize != null) {
+          existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
+            .toInt)
+        } else {
+          existingExecutorMapping.put(newExecutor, segment.getIndexSize
+            .toInt)
+        }
+      } else {
+        val newExecutorMapping = new ConcurrentHashMap[String, Long]()
+        newExecutorMapping.put(newExecutor, segment.getIndexSize)
+        executorToCacheSizeMapping.put(newHost, newExecutorMapping)
+      }
+      val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
+      if (existingSegmentMapping == null) {
+        val newSegmentMapping = new ConcurrentHashMap[String, String]()
+        newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
+        tableToExecutorMapping.put(tableUniqueName, newSegmentMapping)
       } else {
-        val identifiedExecutor = getLeastLoadedExecutor
-        identifiedExecutor
+        existingSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
+        tableToExecutorMapping.put(tableUniqueName, existingSegmentMapping)
       }
-      val existingExecutorSize = executorToCacheSizeMapping.get(newExecutor)
-      executorToCacheSizeMapping.put(newExecutor, existingExecutorSize + segment.getIndexSize
-        .toInt)
-      segmentToExecutorMapping.put(cacheKey, newExecutor)
-      newExecutor
+      s"executor_${newHost}_$newExecutor"
     }
   }
 
diff --git a/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
new file mode 100644
index 0000000..94a9944
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -0,0 +1,115 @@
+package org.apache.indexserver
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.indexserver.DistributedRDDUtils
+
+class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
+
+  val executorCache: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] = DistributedRDDUtils
+    .executorToCacheSizeMapping
+
+  val tableCache: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] = DistributedRDDUtils.tableToExecutorMapping
+
+  override protected def beforeEach(): Unit = {
+    executorCache.clear()
+    tableCache.clear()
+    buildTestData
+  }
+
+  def buildTestData {
+    val tableMap = new ConcurrentHashMap[String, String]
+    tableMap.put("0" , "IP1_EID1")
+    tableMap.put("1", "IP1_EID2")
+    tableCache.put("Table1", tableMap)
+    val executorMap1 = new ConcurrentHashMap[String, Long]
+    executorMap1.put("EID1", 1L)
+    executorMap1.put("EID2", 1L)
+    val executorMap2 = new ConcurrentHashMap[String, Long]
+    executorMap2.put("EID1", 1L)
+    executorMap2.put("EID2", 1L)
+    executorCache.put("IP1", executorMap1)
+    executorCache.put("IP2", executorMap2)
+  }
+
+  test("test server mappings when 1 host is dead") {
+    DistributedRDDUtils.invalidateHosts(Seq("IP1"))
+    assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 1)
+    assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+    assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
+    assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").values().contains("IP1"))
+  }
+
+  test("test server mappings when all executor hosts are dead") {
+    DistributedRDDUtils.invalidateHosts(Seq("IP1", "IP2"))
+    assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 0)
+    assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+    assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP2"))
+    // table cache may be present because even if the executor comes up it can handle further
+    // requests. If another executor is up then reassignment will happen.
+  }
+
+  test("test server mappings when 1 executor is dead") {
+    DistributedRDDUtils.invalidateExecutors(Seq("IP1_EID1"))
+    assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 2)
+    assert(DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+    assert(!DistributedRDDUtils.executorToCacheSizeMapping.get("IP1").contains("EID1"))
+    assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
+    assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").get("0").equalsIgnoreCase("IP1_EID1"))
+  }
+
+  test("Test distribution for legacy segments") {
+    val executorList = (0 until 10).map {
+      host =>
+        val executorIds = (0 until 2).map {
+          executor => executor.toString
+        }
+        (host.toString, executorIds)
+    }.toMap
+    val dataMapDistributableWrapper = (0 to 5010).map {
+      i =>
+        val segment = new Segment(i.toString)
+        segment.setIndexSize(1)
+        val blockletDataMapDistributable = new BlockletDataMapDistributable(i.toString)
+        blockletDataMapDistributable.setSegment(segment)
+        new DataMapDistributableWrapper("", blockletDataMapDistributable)
+    }
+
+    DistributedRDDUtils
+      .getExecutors(dataMapDistributableWrapper.toArray, executorList, "default_table1", 1)
+    DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
+      a => a._2.values().asScala.foreach(size => assert(size == 250 || size == 251))
+    }
+  }
+
+  test("Test distribution for non legacy segments") {
+    val executorList = (0 until 10).map {
+      host =>
+        val executorIds = (0 until 2).map {
+          executor => executor.toString
+        }
+        (host.toString, executorIds)
+    }.toMap
+    val dataMapDistributableWrapper = (0 to 5010).map {
+      i =>
+        val segment = new Segment(i.toString)
+        segment.setIndexSize(111)
+        val blockletDataMapDistributable = new BlockletDataMapDistributable(i.toString)
+        blockletDataMapDistributable.setSegment(segment)
+        new DataMapDistributableWrapper("", blockletDataMapDistributable)
+    }
+
+    DistributedRDDUtils
+      .getExecutors(dataMapDistributableWrapper.toArray, executorList, "default_table1", 1)
+    DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
+      a => a._2.values().asScala.foreach(size => assert(size > 27500 && size < 28000))
+    }
+  }
+}