You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/31 07:01:50 UTC
[carbondata] branch master updated: [CARBONDATA-3399] Implement
executor id based distribution for indexserver
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new fa3e392 [CARBONDATA-3399] Implement executor id based distribution for indexserver
fa3e392 is described below
commit fa3e392c17ff1867baa6ac1ae918346e76ac1add
Author: kunal642 <ku...@gmail.com>
AuthorDate: Mon May 27 12:41:54 2019 +0530
[CARBONDATA-3399] Implement executor id based distribution for indexserver
This closes #3237
---
.../apache/spark/sql/hive/DistributionUtil.scala | 8 +
.../indexserver/DistributedPruneRDD.scala | 9 +-
.../indexserver/DistributedRDDUtils.scala | 218 ++++++++++++++++-----
.../indexserver/DistributedRDDUtilsTest.scala | 115 +++++++++++
4 files changed, 300 insertions(+), 50 deletions(-)
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 0861d2b..4256777 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -89,6 +89,14 @@ object DistributionUtil {
}
}
+ def getExecutors(sparkContext: SparkContext): Map[String, Seq[String]] = {
+ val bm = sparkContext.env.blockManager
+ bm.master.getPeers(bm.blockManagerId)
+ .groupBy(blockManagerId => blockManagerId.host).map {
+ case (host, blockManagerIds) => (host, blockManagerIds.map(_.executorId))
+ }
+ }
+
private def getLocalhostIPs = {
val iface = NetworkInterface.getNetworkInterfaces
var addresses: List[InterfaceAddress] = List.empty
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index d2dab2d..607f923 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
-private[indexserver] class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
+class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit)
extends Partition {
override def index: Int = idx
@@ -50,8 +50,6 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
dataMapFormat: DistributableDataMapFormat)
extends CarbonRDD[(String, ExtendedBlocklet)](ss, Nil) {
- val executorsList: Set[String] = DistributionUtil.getNodeList(ss.sparkContext).toSet
-
@transient private val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD]
.getName)
@@ -106,7 +104,8 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- val executorIP = SparkEnv.get.blockManager.blockManagerId.host
+ val executorIP = s"${ SparkEnv.get.blockManager.blockManagerId.host }_${
+ SparkEnv.get.blockManager.blockManagerId.executorId}"
val value = (executorIP + "_" + cacheSize.toString, reader.getCurrentValue)
value
}
@@ -125,6 +124,8 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
f => new DataMapRDDPartition(id, f._2, f._1)
}.toArray
} else {
+ val executorsList: Map[String, Seq[String]] = DistributionUtil
+ .getExecutors(ss.sparkContext)
val (response, time) = CarbonScalaUtil.logTime {
DistributedRDDUtils.getExecutors(splits.toArray, executorsList, dataMapFormat
.getCarbonTable.getTableUniqueName, id)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index c381f80..c7632be 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -20,7 +20,6 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-import org.apache.commons.lang.StringUtils
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.Partition
@@ -29,14 +28,14 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
object DistributedRDDUtils {
// Segment number to executorNode mapping
- val segmentToExecutorMapping: java.util.Map[String, String] =
- new ConcurrentHashMap[String, String]()
+ val tableToExecutorMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] =
+ new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]()
// executorNode to segmentSize mapping
- val executorToCacheSizeMapping: java.util.Map[String, Long] =
- new ConcurrentHashMap[String, Long]()
+ val executorToCacheSizeMapping: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] =
+ new ConcurrentHashMap[String, ConcurrentHashMap[String, Long]]()
- def getExecutors(segment: Array[InputSplit], executorsList : Set[String],
+ def getExecutors(segment: Array[InputSplit], executorsList : Map[String, Seq[String]],
tableUniqueName: String, rddId: Int): Seq[Partition] = {
// sort the partitions in increasing order of index size.
val (segments, legacySegments) = segment.span(split => split
@@ -47,10 +46,23 @@ object DistributedRDDUtils {
.getDistributable.getSegment.getIndexSize)
val executorCache = DistributedRDDUtils.executorToCacheSizeMapping
// check if any executor is dead.
- val invalidExecutors = executorCache.keySet().asScala.diff(executorsList)
- if (invalidExecutors.nonEmpty) {
+ val invalidHosts = executorCache.keySet().asScala.diff(executorsList.keySet)
+ if (invalidHosts.nonEmpty) {
// extract the dead executor host name
- DistributedRDDUtils.invalidateExecutors(invalidExecutors.toSeq)
+ DistributedRDDUtils.invalidateHosts(invalidHosts.toSeq)
+ }
+ val invalidExecutorIds = executorsList.collect {
+ case (host, executors) if executorCache.get(host) != null =>
+ val toBeRemovedExecutors = executorCache.get(host).keySet().asScala.diff(executors.toSet)
+ if (executors.size == toBeRemovedExecutors.size) {
+ DistributedRDDUtils.invalidateHosts(Seq(host))
+ Seq()
+ } else {
+ toBeRemovedExecutors.map(executor => host + "_" + executor)
+ }
+ }.flatten
+ if (invalidExecutorIds.nonEmpty) {
+ DistributedRDDUtils.invalidateExecutors(invalidExecutorIds.toSeq)
}
(convertToPartition(legacySegments, tableUniqueName, executorsList) ++
convertToPartition(sortedPartitions, tableUniqueName, executorsList)).zipWithIndex.map {
@@ -60,7 +72,7 @@ object DistributedRDDUtils {
}
private def convertToPartition(segments: Seq[InputSplit], tableUniqueName: String,
- executorList: Set[String]): Seq[InputSplit] = {
+ executorList: Map[String, Seq[String]]): Seq[InputSplit] = {
segments.map { partition =>
val wrapper: DataMapDistributable = partition.asInstanceOf[DataMapDistributableWrapper]
.getDistributable
@@ -82,19 +94,27 @@ object DistributedRDDUtils {
executorCacheSize =>
// executorCacheSize would be in the form of 127.0.0.1_10024 where the left of '_'
// would be the executor IP and the right would be the cache that executor is holding.
- val executorIP = executorCacheSize.substring(0, executorCacheSize.lastIndexOf('_'))
+ val hostAndExecutor = executorCacheSize.substring(0,
+ executorCacheSize.lastIndexOf('_'))
+ val (host, executor) = (hostAndExecutor
+ .substring(0, hostAndExecutor.lastIndexOf('_')), hostAndExecutor
+ .substring(hostAndExecutor.lastIndexOf('_') + 1, hostAndExecutor.length))
val size = executorCacheSize.substring(executorCacheSize.lastIndexOf('_') + 1,
executorCacheSize.length)
- executorToCacheSizeMapping.put(executorIP, size.toLong)
+ val executorMapping = executorToCacheSizeMapping.get(host)
+ if (executorMapping != null) {
+ executorMapping.put(executor, size.toLong)
+ executorToCacheSizeMapping.put(host, executorMapping)
+ }
}
}
}
def invalidateCache(tableUniqueName: String): Unit = {
- segmentToExecutorMapping.keySet().asScala.foreach {
+ tableToExecutorMapping.keySet().asScala.foreach {
key =>
- if (key.split("_")(0).equalsIgnoreCase(tableUniqueName)) {
- segmentToExecutorMapping.remove(key)
+ if (key.equalsIgnoreCase(tableUniqueName)) {
+ tableToExecutorMapping.remove(key)
}
}
}
@@ -103,22 +123,77 @@ object DistributedRDDUtils {
* Invalidate the dead executors from the mapping and assign the segments to some other
* executor, so that the query can load the segments to the new assigned executor.
*/
- def invalidateExecutors(invalidExecutors: Seq[String]): Unit = synchronized {
+ def invalidateHosts(invalidHosts: Seq[String]): Unit = {
+ synchronized {
+ val validInvalidExecutors: Map[String, String] = invalidHosts.flatMap {
+ host =>
+ val invalidExecutorToSizeMapping = executorToCacheSizeMapping.remove(host)
+ invalidExecutorToSizeMapping.asScala.map {
+ case (invalidExecutor, size) =>
+ getLeastLoadedExecutor match {
+ case Some((reassignedHost, reassignedExecutorId)) =>
+ val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
+ if (existingExecutorMapping != null) {
+ val existingSize = existingExecutorMapping.get(reassignedExecutorId)
+ existingExecutorMapping.put(reassignedExecutorId, existingSize + size)
+ } else {
+ existingExecutorMapping.put(reassignedExecutorId, size)
+ }
+ executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
+ s"${host}_$invalidExecutor" -> s"${ reassignedHost }_$reassignedExecutorId"
+ case None => "" -> ""
+ }
+ }
+ }.toMap
+ updateTableMappingForInvalidExecutors(validInvalidExecutors)
+ }
+ }
+
+ private def updateTableMappingForInvalidExecutors(validInvalidExecutors: Map[String, String]) {
// remove all invalidExecutor mapping from cache.
- for ((key: String, value: String) <- segmentToExecutorMapping.asScala) {
+ for ((tableName: String, segmentToExecutorMapping) <- tableToExecutorMapping.asScala) {
// find the invalid executor in cache.
- if (invalidExecutors.contains(value)) {
- // remove mapping for the invalid executor.
- val invalidExecutorSize = executorToCacheSizeMapping.remove(key)
- // find a new executor for the segment
- val reassignedExecutor = getLeastLoadedExecutor
- segmentToExecutorMapping.put(key, reassignedExecutor)
- // add the size size of the invalid executor to the reassigned executor.
- executorToCacheSizeMapping.put(
- reassignedExecutor,
- executorToCacheSizeMapping.get(reassignedExecutor) + invalidExecutorSize
- )
+ val newSegmentToExecutorMapping = new ConcurrentHashMap[String, String]()
+ val existingMapping = tableToExecutorMapping.get(tableName)
+ segmentToExecutorMapping.asScala.collect {
+ case (segmentNumber, executorUniqueName) if validInvalidExecutors
+ .contains(executorUniqueName) =>
+ val newExecutorId = validInvalidExecutors(executorUniqueName)
+ // remove mapping for the invalid executor.
+ val executorIdSplits = newExecutorId.split("_")
+ val (host, executorId) = (executorIdSplits(0), executorIdSplits(1))
+ // find a new executor for the segment
+ newSegmentToExecutorMapping
+ .put(segmentNumber, s"${ host }_$executorId")
}
+ existingMapping.putAll(newSegmentToExecutorMapping)
+ tableToExecutorMapping.put(tableName, existingMapping)
+ }
+ }
+
+ def invalidateExecutors(invalidExecutors: Seq[String]): Unit = {
+ synchronized {
+ val validInvalidExecutors: Map[String, String] = invalidExecutors.map {
+ invalidExecutor =>
+ val executorIdSplits = invalidExecutor.split("_")
+ val (host, executor) = (executorIdSplits(0), executorIdSplits(1))
+ val invalidExecutorSize = executorToCacheSizeMapping.get(host).remove(executor)
+ getLeastLoadedExecutor match {
+ case Some((reassignedHost, reassignedExecutorId)) =>
+ val existingExecutorMapping = executorToCacheSizeMapping.get(reassignedHost)
+ if (existingExecutorMapping != null) {
+ val existingSize = existingExecutorMapping.get(reassignedExecutorId)
+ existingExecutorMapping
+ .put(reassignedExecutorId, existingSize + invalidExecutorSize)
+ } else {
+ existingExecutorMapping.put(reassignedExecutorId, invalidExecutorSize)
+ }
+ executorToCacheSizeMapping.put(reassignedHost, existingExecutorMapping)
+ invalidExecutor -> s"${ reassignedHost }_$reassignedExecutorId"
+ case None => "" -> ""
+ }
+ }.toMap
+ updateTableMappingForInvalidExecutors(validInvalidExecutors)
}
}
@@ -127,8 +202,30 @@ object DistributedRDDUtils {
*
* @return
*/
- private def getLeastLoadedExecutor: String = {
- executorToCacheSizeMapping.asScala.toSeq.sortWith(_._2 < _._2).head._1
+ private def getLeastLoadedExecutor: Option[(String, String)] = {
+ val leastHostExecutor = executorToCacheSizeMapping.asScala.flatMap {
+ case (host, executorToCacheMap) =>
+ executorToCacheMap.asScala.map {
+ case (executor, size) =>
+ (host, executor, size)
+ }
+ }.toSeq.sortWith(_._3 < _._3).toList
+ leastHostExecutor match {
+ case head :: _ =>
+ Some(head._1, head._2)
+ case _ => None
+ }
+ }
+
+ private def checkForUnassignedExecutors(validExecutorIds: Seq[String]): Option[String] = {
+ val usedExecutorIds = executorToCacheSizeMapping.asScala.flatMap {
+ case (host, executorMap) =>
+ executorMap.keySet().asScala.map {
+ executor => s"${ host }_$executor"
+ }
+ }
+ val unassignedExecutor = validExecutorIds.diff(usedExecutorIds.toSeq)
+ unassignedExecutor.headOption
}
/**
@@ -137,27 +234,56 @@ object DistributedRDDUtils {
*
* @return
*/
- def assignExecutor(tableName: String, segment: Segment, validExecutors: Set[String]): String = {
- val cacheKey = s"${ tableName }_${ segment.getSegmentNo }"
- val executor = segmentToExecutorMapping.get(cacheKey)
- if (executor != null) {
- executor
+ def assignExecutor(tableUniqueName: String,
+ segment: Segment,
+ validExecutors: Map[String, Seq[String]]): String = {
+ val segmentMapping = tableToExecutorMapping.get(tableUniqueName)
+ lazy val executor = segmentMapping.get(segment.getSegmentNo)
+ if (segmentMapping != null && executor != null) {
+ s"executor_$executor"
} else {
// check if any executor is not assigned. If yes then give priority to that executor
// otherwise get the executor which has handled the least size.
- val unassignedExecutors = validExecutors
- .diff(executorToCacheSizeMapping.asScala.keys.toSet)
- val newExecutor = if (unassignedExecutors.nonEmpty) {
- unassignedExecutors.head.split(":")(0)
+ val validExecutorIds = validExecutors.flatMap {
+ case (host, executors) => executors.map {
+ executor => s"${host}_$executor"
+ }
+ }.toSeq
+ val unassignedExecutor = checkForUnassignedExecutors(validExecutorIds)
+ val (newHost, newExecutor) = if (unassignedExecutor.nonEmpty) {
+ val freeExecutor = unassignedExecutor.get.split("_")
+ (freeExecutor(0), freeExecutor(1))
+ } else {
+ getLeastLoadedExecutor match {
+ case Some((host, executorID)) => (host, executorID)
+ case None => throw new RuntimeException("Could not find any alive executors.")
+ }
+ }
+ val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
+ if (existingExecutorMapping != null) {
+ val existingSize = existingExecutorMapping.get(newExecutor)
+ if (existingSize != null) {
+ existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
+ .toInt)
+ } else {
+ existingExecutorMapping.put(newExecutor, segment.getIndexSize
+ .toInt)
+ }
+ } else {
+ val newExecutorMapping = new ConcurrentHashMap[String, Long]()
+ newExecutorMapping.put(newExecutor, segment.getIndexSize)
+ executorToCacheSizeMapping.put(newHost, newExecutorMapping)
+ }
+ val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
+ if (existingSegmentMapping == null) {
+ val newSegmentMapping = new ConcurrentHashMap[String, String]()
+ newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
+ tableToExecutorMapping.put(tableUniqueName, newSegmentMapping)
} else {
- val identifiedExecutor = getLeastLoadedExecutor
- identifiedExecutor
+ existingSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
+ tableToExecutorMapping.put(tableUniqueName, existingSegmentMapping)
}
- val existingExecutorSize = executorToCacheSizeMapping.get(newExecutor)
- executorToCacheSizeMapping.put(newExecutor, existingExecutorSize + segment.getIndexSize
- .toInt)
- segmentToExecutorMapping.put(cacheKey, newExecutor)
- newExecutor
+ s"executor_${newHost}_$newExecutor"
}
}
diff --git a/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
new file mode 100644
index 0000000..94a9944
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -0,0 +1,115 @@
+package org.apache.indexserver
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite}
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
+import org.apache.carbondata.indexserver.DistributedRDDUtils
+
+class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
+
+ val executorCache: ConcurrentHashMap[String, ConcurrentHashMap[String, Long]] = DistributedRDDUtils
+ .executorToCacheSizeMapping
+
+ val tableCache: ConcurrentHashMap[String, ConcurrentHashMap[String, String]] = DistributedRDDUtils.tableToExecutorMapping
+
+ override protected def beforeEach(): Unit = {
+ executorCache.clear()
+ tableCache.clear()
+ buildTestData
+ }
+
+ def buildTestData {
+ val tableMap = new ConcurrentHashMap[String, String]
+ tableMap.put("0" , "IP1_EID1")
+ tableMap.put("1", "IP1_EID2")
+ tableCache.put("Table1", tableMap)
+ val executorMap1 = new ConcurrentHashMap[String, Long]
+ executorMap1.put("EID1", 1L)
+ executorMap1.put("EID2", 1L)
+ val executorMap2 = new ConcurrentHashMap[String, Long]
+ executorMap2.put("EID1", 1L)
+ executorMap2.put("EID2", 1L)
+ executorCache.put("IP1", executorMap1)
+ executorCache.put("IP2", executorMap2)
+ }
+
+ test("test server mappings when 1 host is dead") {
+ DistributedRDDUtils.invalidateHosts(Seq("IP1"))
+ assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 1)
+ assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+ assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
+ assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").values().contains("IP1"))
+ }
+
+ test("test server mappings when all executor hosts are dead") {
+ DistributedRDDUtils.invalidateHosts(Seq("IP1", "IP2"))
+ assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 0)
+ assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+ assert(!DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP2"))
+ // table cache may be present because even if the executor comes up it can handle further
+ // requests. If another executor is up then reassignment will happen.
+ }
+
+ test("test server mappings when 1 executor is dead") {
+ DistributedRDDUtils.invalidateExecutors(Seq("IP1_EID1"))
+ assert(DistributedRDDUtils.executorToCacheSizeMapping.size() == 2)
+ assert(DistributedRDDUtils.executorToCacheSizeMapping.containsKey("IP1"))
+ assert(!DistributedRDDUtils.executorToCacheSizeMapping.get("IP1").contains("EID1"))
+ assert(DistributedRDDUtils.tableToExecutorMapping.get("Table1").size() == 2)
+ assert(!DistributedRDDUtils.tableToExecutorMapping.get("Table1").get("0").equalsIgnoreCase("IP1_EID1"))
+ }
+
+ test("Test distribution for legacy segments") {
+ val executorList = (0 until 10).map {
+ host =>
+ val executorIds = (0 until 2).map {
+ executor => executor.toString
+ }
+ (host.toString, executorIds)
+ }.toMap
+ val dataMapDistributableWrapper = (0 to 5010).map {
+ i =>
+ val segment = new Segment(i.toString)
+ segment.setIndexSize(1)
+ val blockletDataMapDistributable = new BlockletDataMapDistributable(i.toString)
+ blockletDataMapDistributable.setSegment(segment)
+ new DataMapDistributableWrapper("", blockletDataMapDistributable)
+ }
+
+ DistributedRDDUtils
+ .getExecutors(dataMapDistributableWrapper.toArray, executorList, "default_table1", 1)
+ DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
+ a => a._2.values().asScala.foreach(size => assert(size == 250 || size == 251))
+ }
+ }
+
+ test("Test distribution for non legacy segments") {
+ val executorList = (0 until 10).map {
+ host =>
+ val executorIds = (0 until 2).map {
+ executor => executor.toString
+ }
+ (host.toString, executorIds)
+ }.toMap
+ val dataMapDistributableWrapper = (0 to 5010).map {
+ i =>
+ val segment = new Segment(i.toString)
+ segment.setIndexSize(111)
+ val blockletDataMapDistributable = new BlockletDataMapDistributable(i.toString)
+ blockletDataMapDistributable.setSegment(segment)
+ new DataMapDistributableWrapper("", blockletDataMapDistributable)
+ }
+
+ DistributedRDDUtils
+ .getExecutors(dataMapDistributableWrapper.toArray, executorList, "default_table1", 1)
+ DistributedRDDUtils.executorToCacheSizeMapping.asScala.foreach {
+ a => a._2.values().asScala.foreach(size => assert(size > 27500 && size < 28000))
+ }
+ }
+}