You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/04/25 01:52:06 UTC

[spark] branch master updated: HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9faad07  HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
9faad07 is described below

commit 9faad07ce706890008a8a3ce675fa95b0bdf7c14
Author: Holden Karau <hk...@apple.com>
AuthorDate: Fri Apr 24 18:51:25 2020 -0700

    HOTFIX Revert "[SPARK-20732][CORE] Decommission cache blocks
    
    HOTFIX test issue introduced in SPARK-20732
    
    Closes #28337 from holdenk/revert-SPARK-20732.
    
    Authored-by: Holden Karau <hk...@apple.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala |  28 -----
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  17 +--
 .../org/apache/spark/storage/BlockManager.scala    | 129 ++-------------------
 .../apache/spark/storage/BlockManagerMaster.scala  |  10 --
 .../spark/storage/BlockManagerMasterEndpoint.scala |  50 +-------
 .../spark/storage/BlockManagerMessages.scala       |   7 --
 .../spark/storage/BlockManagerSlaveEndpoint.scala  |   3 -
 .../storage/BlockManagerDecommissionSuite.scala    | 104 -----------------
 .../apache/spark/storage/BlockManagerSuite.scala   |  58 ---------
 9 files changed, 12 insertions(+), 394 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1bc2734..5006da0 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -413,34 +413,6 @@ package object config {
       .intConf
       .createWithDefault(1)
 
-  private[spark] val STORAGE_DECOMMISSION_ENABLED =
-    ConfigBuilder("spark.storage.decommission.enabled")
-      .doc("Whether to decommission the block manager when decommissioning executor")
-      .version("3.1.0")
-      .booleanConf
-      .createWithDefault(false)
-
-  private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
-    ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
-      .internal()
-      .doc("Maximum number of failures which can be handled for the replication of " +
-        "one RDD block when block manager is decommissioning and trying to move its " +
-        "existing blocks.")
-      .version("3.1.0")
-      .intConf
-      .createWithDefault(3)
-
-  private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
-    ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
-      .internal()
-      .doc("The interval of time between consecutive cache block replication reattempts " +
-        "happening on each decommissioning executor (due to storage decommissioning).")
-      .version("3.1.0")
-      .timeConf(TimeUnit.MILLISECONDS)
-      .checkValue(_ > 0, "Time interval between two consecutive attempts of " +
-        "cache block replication should be positive.")
-      .createWithDefaultString("30s")
-
   private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
     ConfigBuilder("spark.storage.replication.topologyFile")
       .version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 67638a5..701d69b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -438,19 +438,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             logError(s"Unexpected error during decommissioning ${e.toString}", e)
         }
         logInfo(s"Finished decommissioning executor $executorId.")
-
-        if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-          try {
-            logInfo("Starting decommissioning block manager corresponding to " +
-              s"executor $executorId.")
-            scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-          } catch {
-            case e: Exception =>
-              logError("Unexpected error during block manager " +
-                s"decommissioning for executor $executorId: ${e.toString}", e)
-          }
-          logInfo(s"Acknowledged decommissioning block manager corresponding to $executorId.")
-        }
       } else {
         logInfo(s"Skipping decommissioning of executor $executorId.")
       }
@@ -587,7 +574,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   private[spark] def decommissionExecutor(executorId: String): Unit = {
     if (driverEndpoint != null) {
-      logInfo("Propagating executor decommission to driver.")
+      logInfo("Propegating executor decommission to driver.")
       driverEndpoint.send(DecommissionExecutor(executorId))
     }
   }
@@ -671,7 +658,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Update the cluster manager on our scheduling needs. Three bits of information are included
    * to help it make decisions.
-   * @param resourceProfileIdToNumExecutors The total number of executors we'd like to have per
+   * @param resourceProfileToNumExecutors The total number of executors we'd like to have per
    *                                      ResourceProfile. The cluster manager shouldn't kill any
    *                                      running executor to reach this number, but, if all
    *                                      existing executors were to die, this is the number
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index aa15d12..e7f8de5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -54,7 +54,6 @@ import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
-import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
 import org.apache.spark.storage.memory._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util._
@@ -242,9 +241,6 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
-  private var blockManagerDecommissioning: Boolean = false
-  private var decommissionManager: Option[BlockManagerDecommissionManager] = None
-
   // A DownloadFileManager used to track all the files of remote blocks which are above the
   // specified memory threshold. Files will be deleted automatically based on weak reference.
   // Exposed for test
@@ -1555,22 +1551,18 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicates a block to peer block managers based on existingReplicas and maxReplicas
+   * Called for pro-active replenishment of blocks lost due to executor failures
    *
    * @param blockId blockId being replicate
    * @param existingReplicas existing block managers that have a replica
    * @param maxReplicas maximum replicas needed
-   * @param maxReplicationFailures number of replication failures to tolerate before
-   *                               giving up.
-   * @return whether block was successfully replicated or not
    */
   def replicateBlock(
       blockId: BlockId,
       existingReplicas: Set[BlockManagerId],
-      maxReplicas: Int,
-      maxReplicationFailures: Option[Int] = None): Boolean = {
+      maxReplicas: Int): Unit = {
     logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
-    blockInfoManager.lockForReading(blockId).forall { info =>
+    blockInfoManager.lockForReading(blockId).foreach { info =>
       val data = doGetLocalBytes(blockId, info)
       val storageLevel = StorageLevel(
         useDisk = info.level.useDisk,
@@ -1578,13 +1570,11 @@ private[spark] class BlockManager(
         useOffHeap = info.level.useOffHeap,
         deserialized = info.level.deserialized,
         replication = maxReplicas)
-      // we know we are called as a result of an executor removal or because the current executor
-      // is getting decommissioned. so we refresh peer cache before trying replication, we won't
-      // try to replicate to a missing executor/another decommissioning executor
+      // we know we are called as a result of an executor removal, so we refresh peer cache
+      // this way, we won't try to replicate to a missing executor with a stale reference
       getPeers(forceFetch = true)
       try {
-        replicate(
-          blockId, data, storageLevel, info.classTag, existingReplicas, maxReplicationFailures)
+        replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
       } finally {
         logDebug(s"Releasing lock for $blockId")
         releaseLockAndDispose(blockId, data)
@@ -1601,11 +1591,9 @@ private[spark] class BlockManager(
       data: BlockData,
       level: StorageLevel,
       classTag: ClassTag[_],
-      existingReplicas: Set[BlockManagerId] = Set.empty,
-      maxReplicationFailures: Option[Int] = None): Boolean = {
+      existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
 
-    val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
-      conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
+    val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
     val tLevel = StorageLevel(
       useDisk = level.useDisk,
       useMemory = level.useMemory,
@@ -1629,7 +1617,7 @@ private[spark] class BlockManager(
       blockId,
       numPeersToReplicateTo)
 
-    while(numFailures <= maxReplicationFailureCount &&
+    while(numFailures <= maxReplicationFailures &&
       !peersForReplication.isEmpty &&
       peersReplicatedTo.size < numPeersToReplicateTo) {
       val peer = peersForReplication.head
@@ -1677,11 +1665,9 @@ private[spark] class BlockManager(
     if (peersReplicatedTo.size < numPeersToReplicateTo) {
       logWarning(s"Block $blockId replicated to only " +
         s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
-      return false
     }
 
     logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
-    return true
   }
 
   /**
@@ -1775,58 +1761,6 @@ private[spark] class BlockManager(
     blocksToRemove.size
   }
 
-  def decommissionBlockManager(): Unit = {
-    if (!blockManagerDecommissioning) {
-      logInfo("Starting block manager decommissioning process")
-      blockManagerDecommissioning = true
-      decommissionManager = Some(new BlockManagerDecommissionManager(conf))
-      decommissionManager.foreach(_.start())
-    } else {
-      logDebug("Block manager already in decommissioning state")
-    }
-  }
-
-  /**
-   * Tries to offload all cached RDD blocks from this BlockManager to peer BlockManagers
-   * Visible for testing
-   */
-  def decommissionRddCacheBlocks(): Unit = {
-    val replicateBlocksInfo = master.getReplicateInfoForRDDBlocks(blockManagerId)
-
-    if (replicateBlocksInfo.nonEmpty) {
-      logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
-        "for block manager decommissioning")
-    }
-
-    // Maximum number of storage replication failure which replicateBlock can handle
-    val maxReplicationFailures = conf.get(
-      config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
-
-    // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
-    //   so that we end up prioritize them over each other
-    val blocksFailedReplication = ThreadUtils.parmap(
-      replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
-      case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
-        val replicatedSuccessfully = replicateBlock(
-          blockId,
-          existingReplicas.toSet,
-          maxReplicas,
-          maxReplicationFailures = Some(maxReplicationFailures))
-        if (replicatedSuccessfully) {
-          logInfo(s"Block $blockId offloaded successfully, Removing block now")
-          removeBlock(blockId)
-          logInfo(s"Block $blockId removed")
-        } else {
-          logWarning(s"Failed to offload block $blockId")
-        }
-        (blockId, replicatedSuccessfully)
-    }.filterNot(_._2).map(_._1)
-    if (blocksFailedReplication.nonEmpty) {
-      logWarning("Blocks failed replication in cache decommissioning " +
-        s"process: ${blocksFailedReplication.mkString(",")}")
-    }
-  }
-
   /**
    * Remove all blocks belonging to the given broadcast.
    */
@@ -1895,52 +1829,7 @@ private[spark] class BlockManager(
     data.dispose()
   }
 
-  /**
-   * Class to handle block manager decommissioning retries
-   * It creates a Thread to retry offloading all RDD cache blocks
-   */
-  private class BlockManagerDecommissionManager(conf: SparkConf) {
-    @volatile private var stopped = false
-    private val blockReplicationThread = new Thread {
-      override def run(): Unit = {
-        while (blockManagerDecommissioning && !stopped) {
-          try {
-            logDebug("Attempting to replicate all cached RDD blocks")
-            decommissionRddCacheBlocks()
-            logInfo("Attempt to replicate all cached blocks done")
-            val sleepInterval = conf.get(
-              config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
-            Thread.sleep(sleepInterval)
-          } catch {
-            case _: InterruptedException =>
-              // no-op
-            case NonFatal(e) =>
-              logError("Error occurred while trying to " +
-                "replicate cached RDD blocks for block manager decommissioning", e)
-          }
-        }
-      }
-    }
-    blockReplicationThread.setDaemon(true)
-    blockReplicationThread.setName("block-replication-thread")
-
-    def start(): Unit = {
-      logInfo("Starting block replication thread")
-      blockReplicationThread.start()
-    }
-
-    def stop(): Unit = {
-      if (!stopped) {
-        stopped = true
-        logInfo("Stopping block replication thread")
-        blockReplicationThread.interrupt()
-        blockReplicationThread.join()
-      }
-    }
-  }
-
   def stop(): Unit = {
-    decommissionManager.foreach(_.stop())
     blockTransferService.close()
     if (blockStoreClient ne blockTransferService) {
       // Closing should be idempotent, but maybe not for the NioBlockTransferService.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 3cfa5d2..e440c1a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -43,16 +43,6 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
-  /** Decommission block managers corresponding to given set of executors */
-  def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
-    driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
-  }
-
-  /** Get Replication Info for all the RDD blocks stored in given blockManagerId */
-  def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
-    driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
-  }
-
   /** Request removal of a dead executor from the driver endpoint.
    *  This is only called on the driver side. Non-blocking
    */
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d936420..d7f7eed 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -65,9 +65,6 @@ class BlockManagerMasterEndpoint(
   // Mapping from executor ID to block manager ID.
   private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
 
-  // Set of block managers which are decommissioning
-  private val decommissioningBlockManagerSet = new mutable.HashSet[BlockManagerId]
-
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
 
@@ -156,13 +153,6 @@ class BlockManagerMasterEndpoint(
       removeExecutor(execId)
       context.reply(true)
 
-    case DecommissionBlockManagers(executorIds) =>
-      decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get))
-      context.reply(true)
-
-    case GetReplicateInfoForRDDBlocks(blockManagerId) =>
-      context.reply(getReplicateInfoForRDDBlocks(blockManagerId))
-
     case StopBlockManagerMaster =>
       context.reply(true)
       stop()
@@ -267,7 +257,6 @@ class BlockManagerMasterEndpoint(
 
     // Remove the block manager from blockManagerIdByExecutor.
     blockManagerIdByExecutor -= blockManagerId.executorId
-    decommissioningBlockManagerSet.remove(blockManagerId)
 
     // Remove it from blockManagerInfo and remove all the blocks.
     blockManagerInfo.remove(blockManagerId)
@@ -310,39 +299,6 @@ class BlockManagerMasterEndpoint(
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
-  /**
-   * Decommission the given Seq of blockmanagers
-   *    - Adds these block managers to decommissioningBlockManagerSet Set
-   *    - Sends the DecommissionBlockManager message to each of the [[BlockManagerSlaveEndpoint]]
-   */
-  def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): Future[Seq[Unit]] = {
-    val newBlockManagersToDecommission = blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
-    val futures = newBlockManagersToDecommission.map { blockManagerId =>
-      decommissioningBlockManagerSet.add(blockManagerId)
-      val info = blockManagerInfo(blockManagerId)
-      info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
-    }
-    Future.sequence{ futures.toSeq }
-  }
-
-  /**
-   * Returns a Seq of ReplicateBlock for each RDD block stored by given blockManagerId
-   * @param blockManagerId - block manager id for which ReplicateBlock info is needed
-   * @return Seq of ReplicateBlock
-   */
-  private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): Seq[ReplicateBlock] = {
-    val info = blockManagerInfo(blockManagerId)
-
-    val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
-    rddBlocks.map { blockId =>
-      val currentBlockLocations = blockLocations.get(blockId)
-      val maxReplicas = currentBlockLocations.size + 1
-      val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != blockManagerId)
-      val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
-      replicateMsg
-    }.toSeq
-  }
-
   // Remove a block from the slaves that have it. This can only be used to remove
   // blocks that the master knows about.
   private def removeBlockFromWorkers(blockId: BlockId): Unit = {
@@ -580,11 +536,7 @@ class BlockManagerMasterEndpoint(
   private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
     val blockManagerIds = blockManagerInfo.keySet
     if (blockManagerIds.contains(blockManagerId)) {
-      blockManagerIds
-        .filterNot { _.isDriver }
-        .filterNot { _ == blockManagerId }
-        .diff(decommissioningBlockManagerSet)
-        .toSeq
+      blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
     } else {
       Seq.empty
     }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 7d4f2ff..895f48d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -36,8 +36,6 @@ private[spark] object BlockManagerMessages {
   case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
     extends ToBlockManagerSlave
 
-  case object DecommissionBlockManager extends ToBlockManagerSlave
-
   // Remove all blocks belonging to a specific RDD.
   case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
 
@@ -127,11 +125,6 @@ private[spark] object BlockManagerMessages {
 
   case object GetStorageStatus extends ToBlockManagerMaster
 
-  case class DecommissionBlockManagers(executorIds: Seq[String]) extends ToBlockManagerMaster
-
-  case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId)
-    extends ToBlockManagerMaster
-
   case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
     extends ToBlockManagerMaster
 
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index a3a7149..29e2114 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -61,9 +61,6 @@ class BlockManagerSlaveEndpoint(
         SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
       }
 
-    case DecommissionBlockManager =>
-      context.reply(blockManager.decommissionBlockManager())
-
     case RemoveBroadcast(broadcastId, _) =>
       doAsync[Int]("removing broadcast " + broadcastId, context) {
         blockManager.removeBroadcast(broadcastId, tellMaster = true)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
deleted file mode 100644
index 59fb056..0000000
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.storage
-
-import java.util.concurrent.Semaphore
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.duration._
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, Success}
-import org.apache.spark.internal.config
-import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
-import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import org.apache.spark.util.ThreadUtils
-
-class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext {
-
-  override def beforeEach(): Unit = {
-    val conf = new SparkConf().setAppName("test").setMaster("local")
-      .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
-      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L)
-      .set(config.STORAGE_DECOMMISSION_ENABLED, true)
-
-    sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf)
-  }
-
-  test(s"verify that an already running task which is going to cache data succeeds " +
-    s"on a decommissioned executor") {
-    // Create input RDD with 10 partitions
-    val input = sc.parallelize(1 to 10, 10)
-    val accum = sc.longAccumulator("mapperRunAccumulator")
-    // Do a count to wait for the executors to be registered.
-    input.count()
-
-    // Create a new RDD where we have sleep in each partition, we are also increasing
-    // the value of accumulator in each partition
-    val sleepyRdd = input.mapPartitions { x =>
-      Thread.sleep(500)
-      accum.add(1)
-      x
-    }
-
-    // Listen for the job
-    val sem = new Semaphore(0)
-    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
-    sc.addSparkListener(new SparkListener {
-      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
-       sem.release()
-      }
-
-      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
-        taskEndEvents.append(taskEnd)
-      }
-    })
-
-    // Cache the RDD lazily
-    sleepyRdd.persist()
-
-    // Start the computation of RDD - this step will also cache the RDD
-    val asyncCount = sleepyRdd.countAsync()
-
-    // Wait for the job to have started
-    sem.acquire(1)
-
-    // Decommission one of the executor
-    val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
-    val execs = sched.getExecutorIds()
-    assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}")
-    val execToDecommission = execs.head
-    sched.decommissionExecutor(execToDecommission)
-
-    // Wait for job to finish
-    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds)
-    assert(asyncCountResult === 10)
-    // All 10 tasks finished, so accum should have been increased 10 times
-    assert(accum.value === 10)
-
-    // All tasks should be successful, nothing should have failed
-    sc.listenerBus.waitUntilEmpty()
-    assert(taskEndEvents.size === 10) // 10 mappers
-    assert(taskEndEvents.map(_.reason).toSet === Set(Success))
-
-    // Since the RDD is cached, so further usage of same RDD should use the
-    // cached data. Original RDD partitions should not be recomputed i.e. accum
-    // should have same value like before
-    assert(sleepyRdd.count() === 10)
-    assert(accum.value === 10)
-  }
-}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index eb875dc..8d06768 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1706,64 +1706,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     verify(liveListenerBus, never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
   }
 
-  test("test decommission block manager should not be part of peers") {
-    val exec1 = "exec1"
-    val exec2 = "exec2"
-    val exec3 = "exec3"
-    val store1 = makeBlockManager(2000, exec1)
-    val store2 = makeBlockManager(2000, exec2)
-    val store3 = makeBlockManager(2000, exec3)
-
-    assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec1, exec2))
-
-    val data = new Array[Byte](400)
-    val blockId = rdd(0, 0)
-    store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
-    assert(master.getLocations(blockId).size === 2)
-
-    master.decommissionBlockManagers(Seq(exec1))
-    // store1 is decommissioned, so it should not be part of peer list for store3
-    assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === Set(exec2))
-  }
-
-  test("test decommissionRddCacheBlocks should offload all cached blocks") {
-    val store1 = makeBlockManager(2000, "exec1")
-    val store2 = makeBlockManager(2000, "exec2")
-    val store3 = makeBlockManager(2000, "exec3")
-
-    val data = new Array[Byte](400)
-    val blockId = rdd(0, 0)
-    store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
-    assert(master.getLocations(blockId).size === 2)
-    assert(master.getLocations(blockId).contains(store1.blockManagerId))
-
-    store1.decommissionRddCacheBlocks()
-    assert(master.getLocations(blockId).size === 2)
-    assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId,
-      store3.blockManagerId))
-  }
-
-  test("test decommissionRddCacheBlocks should keep the block if it is not able to offload") {
-    val store1 = makeBlockManager(12000, "exec1")
-    val store2 = makeBlockManager(2000, "exec2")
-
-    val dataLarge = new Array[Byte](5000)
-    val blockIdLarge = rdd(0, 0)
-    val dataSmall = new Array[Byte](500)
-    val blockIdSmall = rdd(0, 1)
-
-    store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY)
-    store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY)
-    assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
-    assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId))
-
-    store1.decommissionRddCacheBlocks()
-    // Smaller block offloaded to store2
-    assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
-    // Larger block still present in store1 as it can't be offloaded
-    assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
-  }
-
   class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
     var numCalls = 0
     var tempFileManager: DownloadFileManager = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org