You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/04/25 01:52:06 UTC
[spark] branch master updated: HOTFIX Revert "[SPARK-20732][CORE]
Decommission cache blocks
This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9faad07 HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
9faad07 is described below
commit 9faad07ce706890008a8a3ce675fa95b0bdf7c14
Author: Holden Karau <hk...@apple.com>
AuthorDate: Fri Apr 24 18:51:25 2020 -0700
HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
HOTFIX test issue introduced in SPARK-20732
Closes #28337 from holdenk/revert-SPARK-20732.
Authored-by: Holden Karau <hk...@apple.com>
Signed-off-by: Holden Karau <hk...@apple.com>
---
.../org/apache/spark/internal/config/package.scala | 28 -----
.../cluster/CoarseGrainedSchedulerBackend.scala | 17 +--
.../org/apache/spark/storage/BlockManager.scala | 129 ++-------------------
.../apache/spark/storage/BlockManagerMaster.scala | 10 --
.../spark/storage/BlockManagerMasterEndpoint.scala | 50 +-------
.../spark/storage/BlockManagerMessages.scala | 7 --
.../spark/storage/BlockManagerSlaveEndpoint.scala | 3 -
.../storage/BlockManagerDecommissionSuite.scala | 104 -----------------
.../apache/spark/storage/BlockManagerSuite.scala | 58 ---------
9 files changed, 12 insertions(+), 394 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1bc2734..5006da0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -413,34 +413,6 @@ package object config {
.intConf
.createWithDefault(1)
- private[spark] val STORAGE_DECOMMISSION_ENABLED =
- ConfigBuilder("spark.storage.decommission.enabled")
- .doc("Whether to decommission the block manager when decommissioning executor")
- .version("3.1.0")
- .booleanConf
- .createWithDefault(false)
-
- private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
- ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
- .internal()
- .doc("Maximum number of failures which can be handled for the replication of " +
- "one RDD block when block manager is decommissioning and trying to move its " +
- "existing blocks.")
- .version("3.1.0")
- .intConf
- .createWithDefault(3)
-
- private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
- ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
- .internal()
- .doc("The interval of time between consecutive cache block replication reattempts " +
- "happening on each decommissioning executor (due to storage decommissioning).")
- .version("3.1.0")
- .timeConf(TimeUnit.MILLISECONDS)
- .checkValue(_ > 0, "Time interval between two consecutive attempts of " +
- "cache block replication should be positive.")
- .createWithDefaultString("30s")
-
private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 67638a5..701d69b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -438,19 +438,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logError(s"Unexpected error during decommissioning ${e.toString}", e)
}
logInfo(s"Finished decommissioning executor $executorId.")
-
- if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
- try {
- logInfo("Starting decommissioning block manager corresponding to " +
- s"executor $executorId.")
- scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
- } catch {
- case e: Exception =>
- logError("Unexpected error during block manager " +
- s"decommissioning for executor $executorId: ${e.toString}", e)
- }
- logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
- }
} else {
logInfo(s"Skipping decommissioning of executor $executorId.")
}
@@ -587,7 +574,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
private[spark] def decommissionExecutor(executorId: String): Unit = {
if (driverEndpoint != null) {
- logInfo("Propagating executor decommission to driver.")
+ logInfo("Propegating executor decommission to driver.")
driverEndpoint.send(DecommissionExecutor(executorId))
}
}
@@ -671,7 +658,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Update the cluster manager on our scheduling needs. Three bits of information are included
* to help it make decisions.
- * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
+ * @param resourceProfileToNumExecutors The total number of executors we'd like to have per
* ResourceProfile. The cluster manager shouldn't kill any
* running executor to reach this number, but, if all
* existing executors were to die, this is the number
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa15d12..e7f8de5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -54,7 +54,6 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.ExecutorCacheTaskLocation
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
-import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
import org.apache.spark.storage.memory._
import org.apache.spark.unsafe.Platform
import org.apache.spark.util._
@@ -242,9 +241,6 @@ private[spark] class BlockManager(
private var blockReplicationPolicy: BlockReplicationPolicy = _
- private var blockManagerDecommissioning: Boolean = false
- private var decommissionManager: Option[BlockManagerDecommissionManager] = None
-
// A DownloadFileManager used to track all the files of remote blocks which are above the
// specified memory threshold. Files will be deleted automatically based on weak reference.
// Exposed for test
@@ -1555,22 +1551,18 @@ private[spark] class BlockManager(
}
/**
- * Replicates a block to peer block managers based on existingReplicas and maxReplicas
+ * Called for pro-active replenishment of blocks lost due to executor failures
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
- * @param maxReplicationFailures number of replication failures to tolerate before
- * giving up.
- * @return whether block was successfully replicated or not
*/
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
- maxReplicas: Int,
- maxReplicationFailures: Option[Int] = None): Boolean = {
+ maxReplicas: Int): Unit = {
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
- blockInfoManager.lockForReading(blockId).forall { info =>
+ blockInfoManager.lockForReading(blockId).foreach { info =>
val data = doGetLocalBytes(blockId, info)
val storageLevel = StorageLevel(
useDisk = info.level.useDisk,
@@ -1578,13 +1570,11 @@ private[spark] class BlockManager(
useOffHeap = info.level.useOffHeap,
deserialized = info.level.deserialized,
replication = maxReplicas)
- // we know we are called as a result of an executor removal or because the current executor
- // is getting decommissioned. so we refresh peer cache before trying replication, we won't
- // try to replicate to a missing executor/another decommissioning executor
+ // we know we are called as a result of an executor removal, so we refresh peer cache
+ // this way, we won't try to replicate to a missing executor with a stale reference
getPeers(forceFetch = true)
try {
- replicate(
- blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
+ replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
} finally {
logDebug(s"Releasing lock for $blockId")
releaseLockAndDispose(blockId, data)
@@ -1601,11 +1591,9 @@ private[spark] class BlockManager(
data: BlockData,
level: StorageLevel,
classTag: ClassTag[_],
- existingReplicas: Set[BlockManagerId] = Set.empty,
- maxReplicationFailures: Option[Int] = None): Boolean = {
+ existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
- val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
- conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
+ val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
val tLevel = StorageLevel(
useDisk = level.useDisk,
useMemory = level.useMemory,
@@ -1629,7 +1617,7 @@ private[spark] class BlockManager(
blockId,
numPeersToReplicateTo)
- while(numFailures <= maxReplicationFailureCount &&
+ while(numFailures <= maxReplicationFailures &&
!peersForReplication.isEmpty &&
peersReplicatedTo.size < numPeersToReplicateTo) {
val peer = peersForReplication.head
@@ -1677,11 +1665,9 @@ private[spark] class BlockManager(
if (peersReplicatedTo.size < numPeersToReplicateTo) {
logWarning(s"Block $blockId replicated to only " +
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
- return false
}
logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
- return true
}
/**
@@ -1775,58 +1761,6 @@ private[spark] class BlockManager(
blocksToRemove.size
}
- def decommissionBlockManager(): Unit = {
- if (!blockManagerDecommissioning) {
- logInfo("Starting block manager decommissioning process")
- blockManagerDecommissioning = true
- decommissionManager = Some(new BlockManagerDecommissionManager(conf))
- decommissionManager.foreach(_.start())
- } else {
- logDebug("Block manager already in decommissioning state")
- }
- }
-
- /**
- * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
- * Visible for testing
- */
- def decommissionRddCacheBlocks(): Unit = {
- val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)
-
- if (replicateBlocksInfo.nonEmpty) {
- logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
- "for block manager decommissioning")
- }
-
- // Maximum number of storage replication failure which replicateBlock can handle
- val maxReplicationFailures = conf.get(
- config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
-
- // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
- // so that we end up prioritize them over each other
- val blocksFailedReplication = ThreadUtils.parmap(
- replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
- case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
- val replicatedSuccessfully = replicateBlock(
- blockId,
- existingReplicas.toSet,
- maxReplicas,
- maxReplicationFailures = Some(maxReplicationFailures))
- if (replicatedSuccessfully) {
- logInfo(s"Block $blockId offloaded successfully, Removing block now")
- removeBlock(blockId)
- logInfo(s"Block $blockId removed")
- } else {
- logWarning(s"Failed to offload block $blockId")
- }
- (blockId, replicatedSuccessfully)
- }.filterNot(_._2).map(_._1)
- if (blocksFailedReplication.nonEmpty) {
- logWarning("Blocks failed replication in cache decommissioning " +
- s"process: ${blocksFailedReplication.mkString(",")}")
- }
- }
-
/**
* Remove all blocks belonging to the given broadcast.
*/
@@ -1895,52 +1829,7 @@ private[spark] class BlockManager(
data.dispose()
}
- /**
- * Class to handle block manager decommissioning retries
- * It creates a Thread to retry offloading all RDD cache blocks
- */
- private class BlockManagerDecommissionManager(conf: SparkConf) {
- @volatile private var stopped = false
- private val blockReplicationThread = new Thread {
- override def run(): Unit = {
- while (blockManagerDecommissioning && !stopped) {
- try {
- logDebug("Attempting to replicate all cached RDD blocks")
- decommissionRddCacheBlocks()
- logInfo("Attempt to replicate all cached blocks done")
- val sleepInterval = conf.get(
- config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
- Thread.sleep(sleepInterval)
- } catch {
- case _: InterruptedException =>
- // no-op
- case NonFatal(e) =>
- logError("Error occurred while trying to " +
- "replicate cached RDD blocks for block manager decommissioning", e)
- }
- }
- }
- }
- blockReplicationThread.setDaemon(true)
- blockReplicationThread.setName("block-replication-thread")
-
- def start(): Unit = {
- logInfo("Starting block replication thread")
- blockReplicationThread.start()
- }
-
- def stop(): Unit = {
- if (!stopped) {
- stopped = true
- logInfo("Stopping block replication thread")
- blockReplicationThread.interrupt()
- blockReplicationThread.join()
- }
- }
- }
-
def stop(): Unit = {
- decommissionManager.foreach(_.stop())
blockTransferService.close()
if (blockStoreClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3cfa5d2..e440c1a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -43,16 +43,6 @@ class BlockManagerMaster(
logInfo("Removed " + execId + " successfully in removeExecutor")
}
- /** Decommission block managers corresponding to given set of executors */
- def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
- driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
- }
-
- /** Get Replication Info for all the RDD blocks stored in given blockManagerId */
- def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
- driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
- }
-
/** Request removal of a dead executor from the driver endpoint.
* This is only called on the driver side. Non-blocking
*/
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d936420..d7f7eed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -65,9 +65,6 @@ class BlockManagerMasterEndpoint(
// Mapping from executor ID to block manager ID.
private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
- // Set of block managers which are decommissioning
- private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]
-
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
@@ -156,13 +153,6 @@ class BlockManagerMasterEndpoint(
removeExecutor(execId)
context.reply(true)
- case DecommissionBlockManagers(executorIds) =>
- decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get))
- context.reply(true)
-
- case GetReplicateInfoForRDDBlocks(blockManagerId) =>
- context.reply(getReplicateInfoForRDDBlocks(blockManagerId))
-
case StopBlockManagerMaster =>
context.reply(true)
stop()
@@ -267,7 +257,6 @@ class BlockManagerMasterEndpoint(
// Remove the block manager from blockManagerIdByExecutor.
blockManagerIdByExecutor -= blockManagerId.executorId
- decommissioningBlockManagerSet.remove(blockManagerId)
// Remove it from blockManagerInfo and remove all the blocks.
blockManagerInfo.remove(blockManagerId)
@@ -310,39 +299,6 @@ class BlockManagerMasterEndpoint(
blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
}
- /**
- * Decommission the given Seq of blockmanagers
- * - Adds these block managers to decommissioningBlockManagerSet Set
- * - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
- */
- def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
- val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
- val futures = newBlockManagersToDecommission.map { blockManagerId =>
- decommissioningBlockManagerSet.add(blockManagerId)
- val info = blockManagerInfo(blockManagerId)
- info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
- }
- Future.sequence{ futures.toSeq }
- }
-
- /**
- * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
- * @param blockManagerId - block manager id for which ReplicateBlock info is needed
- * @return Seq of ReplicateBlock
- */
- private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
- val info = blockManagerInfo(blockManagerId)
-
- val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
- rddBlocks.map { blockId =>
- val currentBlockLocations = blockLocations.get(blockId)
- val maxReplicas = currentBlockLocations.size + 1
- val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
- val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
- replicateMsg
- }.toSeq
- }
-
// Remove a block from the slaves that have it. This can only be used to remove
// blocks that the master knows about.
private def removeBlockFromWorkers(blockId: BlockId): Unit = {
@@ -580,11 +536,7 @@ class BlockManagerMasterEndpoint(
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet
if (blockManagerIds.contains(blockManagerId)) {
- blockManagerIds
- .filterNot { _.isDriver }
- .filterNot { _ == blockManagerId }
- .diff(decommissioningBlockManagerSet)
- .toSeq
+ blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
} else {
Seq.empty
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 7d4f2ff..895f48d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -36,8 +36,6 @@ private[spark] object BlockManagerMessages {
case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
extends ToBlockManagerSlave
- case object DecommissionBlockManager extends ToBlockManagerSlave
-
// Remove all blocks belonging to a specific RDD.
case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
@@ -127,11 +125,6 @@ private[spark] object BlockManagerMessages {
case object GetStorageStatus extends ToBlockManagerMaster
- case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster
-
- case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId)
- extends ToBlockManagerMaster
-
case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
extends ToBlockManagerMaster
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index a3a7149..29e2114 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -61,9 +61,6 @@ class BlockManagerSlaveEndpoint(
SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
}
- case DecommissionBlockManager =>
- context.reply(blockManager.decommissionBlockManager())
-
case RemoveBroadcast(broadcastId, _) =>
doAsync[Int]("removing broadcast " + broadcastId, context) {
blockManager.removeBroadcast(broadcastId, tellMaster = true)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
deleted file mode 100644
index 59fb056..0000000
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.Semaphore
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration._
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success}
-import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import org.apache.spark.util.ThreadUtils
-
-class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
-
- override def beforeEach(): Unit = {
- val conf = new SparkConf().setAppName("test").setMaster("local")
- .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
- .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L)
- .set(config.STORAGE_DECOMMISSION_ENABLED, true)
-
- sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf)
- }
-
- test(s"verify that an already running task which is going to cache data succeeds " +
- s"on a decommissioned executor") {
- // Create input RDD with 10 partitions
- val input = sc.parallelize(1 to 10, 10)
- val accum = sc.longAccumulator("mapperRunAccumulator")
- // Do a count to wait for the executors to be registered.
- input.count()
-
- // Create a new RDD where we have sleep in each partition, we are also increasing
- // the value of accumulator in each partition
- val sleepyRdd = input.mapPartitions { x =>
- Thread.sleep(500)
- accum.add(1)
- x
- }
-
- // Listen for the job
- val sem = new Semaphore(0)
- val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
- sc.addSparkListener(new SparkListener {
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- sem.release()
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
- taskEndEvents.append(taskEnd)
- }
- })
-
- // Cache the RDD lazily
- sleepyRdd.persist()
-
- // Start the computation of RDD - this step will also cache the RDD
- val asyncCount = sleepyRdd.countAsync()
-
- // Wait for the job to have started
- sem.acquire(1)
-
- // Decommission one of the executor
- val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
- val execs = sched.getExecutorIds()
- assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}")
- val execToDecommission = execs.head
- sched.decommissionExecutor(execToDecommission)
-
- // Wait for job to finish
- val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds)
- assert(asyncCountResult === 10)
- // All 10 tasks finished, so accum should have been increased 10 times
- assert(accum.value === 10)
-
- // All tasks should be successful, nothing should have failed
- sc.listenerBus.waitUntilEmpty()
- assert(taskEndEvents.size === 10) // 10 mappers
- assert(taskEndEvents.map(_.reason).toSet === Set(Success))
-
- // Since the RDD is cached, so further usage of same RDD should use the
- // cached data. Original RDD partitions should not be recomputed i.e. accum
- // should have same value like before
- assert(sleepyRdd.count() === 10)
- assert(accum.value === 10)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index eb875dc..8d06768 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1706,64 +1706,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
}
- test("test decommission block manager should not be part of peers") {
- val exec1 = "exec1"
- val exec2 = "exec2"
- val exec3 = "exec3"
- val store1 = makeBlockManager(2000, exec1)
- val store2 = makeBlockManager(2000, exec2)
- val store3 = makeBlockManager(2000, exec3)
-
- assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2))
-
- val data = new Array[Byte](400)
- val blockId = rdd(0, 0)
- store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
- assert(master.getLocations(blockId).size === 2)
-
- master.decommissionBlockManagers(Seq(exec1))
- // store1 is decommissioned, so it should not be part of peer list for store3
- assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2))
- }
-
- test("test decommissionRddCacheBlocks should offload all cached blocks") {
- val store1 = makeBlockManager(2000, "exec1")
- val store2 = makeBlockManager(2000, "exec2")
- val store3 = makeBlockManager(2000, "exec3")
-
- val data = new Array[Byte](400)
- val blockId = rdd(0, 0)
- store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
- assert(master.getLocations(blockId).size === 2)
- assert(master.getLocations(blockId).contains(store1.blockManagerId))
-
- store1.decommissionRddCacheBlocks()
- assert(master.getLocations(blockId).size === 2)
- assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId,
- store3.blockManagerId))
- }
-
- test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") {
- val store1 = makeBlockManager(12000, "exec1")
- val store2 = makeBlockManager(2000, "exec2")
-
- val dataLarge = new Array[Byte](5000)
- val blockIdLarge = rdd(0, 0)
- val dataSmall = new Array[Byte](500)
- val blockIdSmall = rdd(0, 1)
-
- store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY)
- store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY)
- assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
- assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId))
-
- store1.decommissionRddCacheBlocks()
- // Smaller block offloaded to store2
- assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
- // Larger block still present in store1 as it can't be offloaded
- assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
- }
-
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0
var tempFileManager: DownloadFileManager = null
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org