You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/02/24 23:40:03 UTC

spark git commit: [SPARK-15355][CORE] Proactive block replication

Repository: spark
Updated Branches:
  refs/heads/master 330c3e33b -> fa7c582e9


[SPARK-15355][CORE] Proactive block replication

## What changes were proposed in this pull request?

We are proposing addition of pro-active block replication in case of executor failures. BlockManagerMasterEndpoint does all the book-keeping to keep a track of all the executors and the blocks they hold. It also keeps a track of which executors are alive through heartbeats. When an executor is removed, all this book-keeping state is updated to reflect the lost executor. This step can be used to identify executors that are still in possession of a copy of the cached data and a message could be sent to them to use the existing "replicate" function to find and place new replicas on other suitable hosts. Blocks replicated this way will let the master know of their existence.

This can happen when an executor is lost, and would that way be pro-active as opposed be being done at query time.
## How was this patch tested?

This patch was tested with existing unit tests along with new unit tests added to test the functionality.

Author: Shubham Chopra <sc...@bloomberg.net>

Closes #14412 from shubhamchopra/ProactiveBlockReplication.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa7c582e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa7c582e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa7c582e

Branch: refs/heads/master
Commit: fa7c582e9442b985a0493fb1dd15b3fb9b6031b4
Parents: 330c3e3
Author: Shubham Chopra <sc...@bloomberg.net>
Authored: Fri Feb 24 15:40:01 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Feb 24 15:40:01 2017 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 43 +++++++--
 .../storage/BlockManagerMasterEndpoint.scala    | 24 +++++
 .../spark/storage/BlockManagerMessages.scala    |  4 +
 .../storage/BlockManagerSlaveEndpoint.scala     |  4 +
 .../storage/BlockManagerReplicationSuite.scala  | 95 ++++++++++++++++----
 docs/configuration.md                           |  9 ++
 6 files changed, 154 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6946a98..45b7338 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1160,6 +1160,34 @@ private[spark] class BlockManager(
   }
 
   /**
+   * Called for pro-active replenishment of blocks lost due to executor failures
+   *
+   * @param blockId blockId being replicate
+   * @param existingReplicas existing block managers that have a replica
+   * @param maxReplicas maximum replicas needed
+   */
+  def replicateBlock(
+      blockId: BlockId,
+      existingReplicas: Set[BlockManagerId],
+      maxReplicas: Int): Unit = {
+    logInfo(s"Pro-actively replicating $blockId")
+    blockInfoManager.lockForReading(blockId).foreach { info =>
+      val data = doGetLocalBytes(blockId, info)
+      val storageLevel = StorageLevel(
+        useDisk = info.level.useDisk,
+        useMemory = info.level.useMemory,
+        useOffHeap = info.level.useOffHeap,
+        deserialized = info.level.deserialized,
+        replication = maxReplicas)
+      try {
+        replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
+      } finally {
+        releaseLock(blockId)
+      }
+    }
+  }
+
+  /**
    * Replicate block to another node. Note that this is a blocking call that returns after
    * the block has been replicated.
    */
@@ -1167,7 +1195,8 @@ private[spark] class BlockManager(
       blockId: BlockId,
       data: ChunkedByteBuffer,
       level: StorageLevel,
-      classTag: ClassTag[_]): Unit = {
+      classTag: ClassTag[_],
+      existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
 
     val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
     val tLevel = StorageLevel(
@@ -1181,20 +1210,22 @@ private[spark] class BlockManager(
 
     val startTime = System.nanoTime
 
-    var peersReplicatedTo = mutable.HashSet.empty[BlockManagerId]
+    var peersReplicatedTo = mutable.HashSet.empty ++ existingReplicas
     var peersFailedToReplicateTo = mutable.HashSet.empty[BlockManagerId]
     var numFailures = 0
 
+    val initialPeers = getPeers(false).filterNot(existingReplicas.contains(_))
+
     var peersForReplication = blockReplicationPolicy.prioritize(
       blockManagerId,
-      getPeers(false),
-      mutable.HashSet.empty,
+      initialPeers,
+      peersReplicatedTo,
       blockId,
       numPeersToReplicateTo)
 
     while(numFailures <= maxReplicationFailures &&
-        !peersForReplication.isEmpty &&
-        peersReplicatedTo.size != numPeersToReplicateTo) {
+      !peersForReplication.isEmpty &&
+      peersReplicatedTo.size < numPeersToReplicateTo) {
       val peer = peersForReplication.head
       try {
         val onePeerStartTime = System.nanoTime

http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 145c434..84c04d2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -22,6 +22,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.mutable
 import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
+import scala.util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +66,8 @@ class BlockManagerMasterEndpoint(
     mapper
   }
 
+  val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean
+
   logInfo("BlockManagerMasterEndpoint up")
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -195,17 +198,38 @@ class BlockManagerMasterEndpoint(
 
     // Remove it from blockManagerInfo and remove all the blocks.
     blockManagerInfo.remove(blockManagerId)
+
     val iterator = info.blocks.keySet.iterator
     while (iterator.hasNext) {
       val blockId = iterator.next
       val locations = blockLocations.get(blockId)
       locations -= blockManagerId
+      // De-register the block if none of the block managers have it. Otherwise, if pro-active
+      // replication is enabled, and a block is either an RDD or a test block (the latter is used
+      // for unit testing), we send a message to a randomly chosen executor location to replicate
+      // the given block. Note that we ignore other block types (such as broadcast/shuffle blocks
+      // etc.) as replication doesn't make much sense in that context.
       if (locations.size == 0) {
         blockLocations.remove(blockId)
+        logWarning(s"No more replicas available for $blockId !")
+      } else if (proactivelyReplicate && (blockId.isRDD || blockId.isInstanceOf[TestBlockId])) {
+        // As a heursitic, assume single executor failure to find out the number of replicas that
+        // existed before failure
+        val maxReplicas = locations.size + 1
+        val i = (new Random(blockId.hashCode)).nextInt(locations.size)
+        val blockLocations = locations.toSeq
+        val candidateBMId = blockLocations(i)
+        blockManagerInfo.get(candidateBMId).foreach { bm =>
+          val remainingLocations = locations.toSeq.filter(bm => bm != candidateBMId)
+          val replicateMsg = ReplicateBlock(blockId, remainingLocations, maxReplicas)
+          bm.slaveEndpoint.ask[Boolean](replicateMsg)
+        }
       }
     }
+
     listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
     logInfo(s"Removing block manager $blockManagerId")
+
   }
 
   private def removeExecutor(execId: String) {

http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index d71acbb..0aea438 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -32,6 +32,10 @@ private[spark] object BlockManagerMessages {
   // blocks that the master knows about.
   case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave
 
+  // Replicate blocks that were lost due to executor failure
+  case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], maxReplicas: Int)
+    extends ToBlockManagerSlave
+
   // Remove all blocks belonging to a specific RDD.
   case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index d17ddbc..1aaa424 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -74,6 +74,10 @@ class BlockManagerSlaveEndpoint(
 
     case TriggerThreadDump =>
       context.reply(Utils.getThreadDump())
+
+    case ReplicateBlock(blockId, replicas, maxReplicas) =>
+      context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
+
   }
 
   private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T) {

http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index f4bfdc2..ccede34 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -37,32 +37,31 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
-/** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite
-    with Matchers
-    with BeforeAndAfter
-    with LocalSparkContext {
-
-  private val conf = new SparkConf(false).set("spark.app.id", "test")
-  private var rpcEnv: RpcEnv = null
-  private var master: BlockManagerMaster = null
-  private val securityMgr = new SecurityManager(conf)
-  private val bcastManager = new BroadcastManager(true, conf, securityMgr)
-  private val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
-  private val shuffleManager = new SortShuffleManager(conf)
+trait BlockManagerReplicationBehavior extends SparkFunSuite
+  with Matchers
+  with BeforeAndAfter
+  with LocalSparkContext {
+
+  val conf: SparkConf
+  protected var rpcEnv: RpcEnv = null
+  protected var master: BlockManagerMaster = null
+  protected lazy val securityMgr = new SecurityManager(conf)
+  protected lazy val bcastManager = new BroadcastManager(true, conf, securityMgr)
+  protected lazy val mapOutputTracker = new MapOutputTrackerMaster(conf, bcastManager, true)
+  protected lazy val shuffleManager = new SortShuffleManager(conf)
 
   // List of block manager created during an unit test, so that all of the them can be stopped
   // after the unit test.
-  private val allStores = new ArrayBuffer[BlockManager]
+  protected val allStores = new ArrayBuffer[BlockManager]
 
   // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
-  conf.set("spark.kryoserializer.buffer", "1m")
-  private val serializer = new KryoSerializer(conf)
+
+  protected lazy val serializer = new KryoSerializer(conf)
 
   // Implicitly convert strings to BlockIds for test clarity.
-  private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+  protected implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
 
-  private def makeBlockManager(
+  protected def makeBlockManager(
       maxMem: Long,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     conf.set("spark.testing.memory", maxMem.toString)
@@ -355,7 +354,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
    * is correct. Then it also drops the block from memory of each store (using LRU) and
    * again checks whether the master's knowledge gets updated.
    */
-  private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+  protected def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
     import org.apache.spark.storage.StorageLevel._
 
     assert(maxReplication > 1,
@@ -448,3 +447,61 @@ class BlockManagerReplicationSuite extends SparkFunSuite
     }
   }
 }
+
+class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior {
+  val conf = new SparkConf(false).set("spark.app.id", "test")
+  conf.set("spark.kryoserializer.buffer", "1m")
+}
+
+class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior {
+  val conf = new SparkConf(false).set("spark.app.id", "test")
+  conf.set("spark.kryoserializer.buffer", "1m")
+  conf.set("spark.storage.replication.proactive", "true")
+  conf.set("spark.storage.exceptionOnPinLeak", "true")
+
+  (2 to 5).foreach{ i =>
+    test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") {
+      testProactiveReplication(i)
+    }
+  }
+
+  def testProactiveReplication(replicationFactor: Int) {
+    val blockSize = 1000
+    val storeSize = 10000
+    val initialStores = (1 to 10).map { i => makeBlockManager(storeSize, s"store$i") }
+
+    val blockId = "a1"
+
+    val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+    initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+    val blockLocations = master.getLocations(blockId)
+    logInfo(s"Initial locations : $blockLocations")
+
+    assert(blockLocations.size === replicationFactor)
+
+    // remove a random blockManager
+    val executorsToRemove = blockLocations.take(replicationFactor - 1)
+    logInfo(s"Removing $executorsToRemove")
+    executorsToRemove.foreach{exec =>
+      master.removeExecutor(exec.executorId)
+      // giving enough time for replication to happen and new block be reported to master
+      Thread.sleep(200)
+    }
+
+    // giving enough time for replication complete and locks released
+    Thread.sleep(500)
+
+    val newLocations = master.getLocations(blockId).toSet
+    logInfo(s"New locations : $newLocations")
+    assert(newLocations.size === replicationFactor)
+    // there should only be one common block manager between initial and new locations
+    assert(newLocations.intersect(blockLocations.toSet).size === 1)
+
+    // check if all the read locks have been released
+    initialStores.filter(bm => newLocations.contains(bm.blockManagerId)).foreach { bm =>
+      val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
+      assert(locks.size === 0, "Read locks unreleased!")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c582e/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 2fcb3a0..63392a7 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1000,6 +1000,15 @@ Apart from these, the following properties are also available, and may be useful
     storage space to unroll the new block in its entirety.
   </td>
 </tr>
+<tr>
+  <td><code>spark.storage.replication.proactive<code></td>
+  <td>false</td>
+  <td>
+    Enables proactive block replication for RDD blocks. Cached RDD block replicas lost due to
+    executor failures are replenished if there are any existing available replicas. This tries
+    to get the replication level of the block to the initial number.
+  </td>
+</tr>
 </table>
 
 ### Execution Behavior


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org