You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2014/11/11 03:23:07 UTC

spark git commit: [SPARK-3495][SPARK-3496] Backporting block replication fixes made in master to branch 1.1

Repository: spark
Updated Branches:
  refs/heads/branch-1.1 3d889dfc1 -> be0cc9952


[SPARK-3495][SPARK-3496] Backporting block replication fixes made in master to branch 1.1

The original PR was #2366

This backport was non-trivial because Spark 1.1 uses ConnectionManager instead of NioBlockTransferService, which required slight modification to unit tests. Other than that the code is exactly same as in the original PR. Please refer to discussion in the original PR if you have any thoughts.

Author: Tathagata Das <ta...@gmail.com>

Closes #3191 from tdas/replication-fix-branch-1.1-backport and squashes the following commits:

593214a [Tathagata Das] Merge remote-tracking branch 'apache-github/branch-1.1' into branch-1.1
2ed927f [Tathagata Das] Fixed error in unit test.
de4ff73 [Tathagata Das] [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be0cc995
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be0cc995
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be0cc995

Branch: refs/heads/branch-1.1
Commit: be0cc9952d6c8b4cfe9ff10a761e0677cba64489
Parents: 3d889df
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Nov 10 18:23:02 2014 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Nov 10 18:23:02 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 120 +++++-
 .../apache/spark/storage/BlockManagerId.scala   |   4 +-
 .../spark/storage/BlockManagerMaster.scala      |   9 +-
 .../spark/storage/BlockManagerMasterActor.scala |  29 +-
 .../spark/storage/BlockManagerMessages.scala    |   2 +-
 .../apache/spark/broadcast/BroadcastSuite.scala |   2 +-
 .../storage/BlockManagerReplicationSuite.scala  | 410 +++++++++++++++++++
 .../spark/storage/BlockManagerSuite.scala       |   3 +-
 8 files changed, 535 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b9501c3..3113d4a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -139,6 +139,11 @@ private[spark] class BlockManager(
   private val broadcastCleaner = new MetadataCleaner(
     MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
 
+  // Field related to peer block managers that are necessary for block replication
+  @volatile private var cachedPeers: Seq[BlockManagerId] = _
+  private val peerFetchLock = new Object
+  private var lastPeerFetchTime = 0L
+
   initialize()
 
   /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -822,28 +827,111 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Replicate block to another node.
+   * Get peer block managers in the system.
+   */
+  private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
+    peerFetchLock.synchronized {
+      val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
+      val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
+      if (cachedPeers == null || forceFetch || timeout) {
+        cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
+        lastPeerFetchTime = System.currentTimeMillis
+        logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
+      }
+      cachedPeers
+    }
+  }
+
+  /**
+   * Replicate block to another node. Not that this is a blocking call that returns after
+   * the block has been replicated.
    */
-  @volatile var cachedPeers: Seq[BlockManagerId] = null
   private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
+    val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
+    val numPeersToReplicateTo = level.replication - 1
+    val peersForReplication = new ArrayBuffer[BlockManagerId]
+    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
+    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
     val tLevel = StorageLevel(
       level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
-    if (cachedPeers == null) {
-      cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
+    val startTime = System.currentTimeMillis
+    val random = new Random(blockId.hashCode)
+
+    var replicationFailed = false
+    var failures = 0
+    var done = false
+
+    // Get cached list of peers
+    peersForReplication ++= getPeers(forceFetch = false)
+
+    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
+    // So assuming the list of peers does not change and no replication failures,
+    // if there are multiple attempts in the same node to replicate the same block,
+    // the same set of peers will be selected.
+    def getRandomPeer(): Option[BlockManagerId] = {
+      // If replication had failed, then force update the cached list of peers and remove the peers
+      // that have been already used
+      if (replicationFailed) {
+        peersForReplication.clear()
+        peersForReplication ++= getPeers(forceFetch = true)
+        peersForReplication --= peersReplicatedTo
+        peersForReplication --= peersFailedToReplicateTo
+      }
+      if (!peersForReplication.isEmpty) {
+        Some(peersForReplication(random.nextInt(peersForReplication.size)))
+      } else {
+        None
+      }
     }
-    for (peer: BlockManagerId <- cachedPeers) {
-      val start = System.nanoTime
-      data.rewind()
-      logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
-        s"To node: $peer")
-      val putBlock = PutBlock(blockId, data, tLevel)
-      val cmId = new ConnectionManagerId(peer.host, peer.port)
-      val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
-      if (!syncPutBlockSuccess) {
-        logError(s"Failed to call syncPutBlock to $peer")
+
+    // One by one choose a random peer and try uploading the block to it
+    // If replication fails (e.g., target peer is down), force the list of cached peers
+    // to be re-fetched from driver and then pick another random peer for replication. Also
+    // temporarily black list the peer for which replication failed.
+    //
+    // This selection of a peer and replication is continued in a loop until one of the
+    // following 3 conditions is fulfilled:
+    // (i) specified number of peers have been replicated to
+    // (ii) too many failures in replicating to peers
+    // (iii) no peer left to replicate to
+    //
+    while (!done) {
+      getRandomPeer() match {
+        case Some(peer) =>
+          val onePeerStartTime = System.currentTimeMillis
+          data.rewind()
+          logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
+          val putBlock = PutBlock(blockId, data, tLevel)
+          val cmId = new ConnectionManagerId(peer.host, peer.port)
+          val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
+          if (syncPutBlockSuccess) {
+            logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms"
+              .format((System.currentTimeMillis - onePeerStartTime)))
+            peersReplicatedTo += peer
+            peersForReplication -= peer
+            replicationFailed = false
+            if (peersReplicatedTo.size == numPeersToReplicateTo) {
+              done = true // specified number of peers have been replicated to
+            }
+          } else {
+            logWarning(s"Failed to replicate $blockId to $peer, failure #$failures")
+            failures += 1
+            replicationFailed = true
+            peersFailedToReplicateTo += peer
+            if (failures > maxReplicationFailures) { // too many failures in replicating to peers
+              done = true
+            }
+          }
+        case None => // no peer left to replicate to
+          done = true
       }
-      logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
-        .format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
+    }
+    val timeTakeMs = (System.currentTimeMillis - startTime)
+    logTrace(s"Replicating $blockId of ${data.limit()} bytes to " +
+      s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
+    if (peersReplicatedTo.size < numPeersToReplicateTo) {
+      logWarning(s"Block $blockId replicated to only " +
+        s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index ffd2a4d..fb9305a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -62,7 +62,9 @@ class BlockManagerId private (
 
   def nettyPort: Int = nettyPort_
 
-  override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
+  def isDriver: Boolean = (executorId == "<driver>")
+
+  override def writeExternal(out: ObjectOutput) {
     out.writeUTF(executorId_)
     out.writeUTF(host_)
     out.writeInt(port_)

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e67b3dc..c2365ca 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -80,13 +80,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
   }
 
   /** Get ids of other nodes in the cluster from the driver */
-  def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
-    val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
-    if (result.length != numPeers) {
-      throw new SparkException(
-        "Error getting peers, only got " + result.size + " instead of " + numPeers)
-    }
-    result
+  def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+    askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index dc80148..ad1b68b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     case GetLocationsMultipleBlockIds(blockIds) =>
       sender ! getLocationsMultipleBlockIds(blockIds)
 
-    case GetPeers(blockManagerId, size) =>
-      sender ! getPeers(blockManagerId, size)
+    case GetPeers(blockManagerId) =>
+      sender ! getPeers(blockManagerId)
 
     case GetMemoryStatus =>
       sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
    * from the executors, but not from the driver.
    */
   private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
-    // TODO: Consolidate usages of <driver>
     import context.dispatcher
     val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
     val requiredBlockManagers = blockManagerInfo.values.filter { info =>
-      removeFromDriver || info.blockManagerId.executorId != "<driver>"
+      removeFromDriver || !info.blockManagerId.isDriver
     }
     Future.sequence(
       requiredBlockManagers.map { bm =>
@@ -213,7 +212,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     val minSeenTime = now - slaveTimeout
     val toRemove = new mutable.HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
-      if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
+      if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
         logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
           + (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
         toRemove += info.blockManagerId
@@ -233,7 +232,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
    */
   private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
     if (!blockManagerInfo.contains(blockManagerId)) {
-      blockManagerId.executorId == "<driver>" && !isLocal
+      blockManagerId.isDriver && !isLocal
     } else {
       blockManagerInfo(blockManagerId).updateLastSeenMs()
       true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
       tachyonSize: Long) {
 
     if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.executorId == "<driver>" && !isLocal) {
+      if (blockManagerId.isDriver && !isLocal) {
         // We intentionally do not register the master (except in local mode),
         // so we should not indicate failure.
         sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
     blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
-    val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-
-    val selfIndex = peers.indexOf(blockManagerId)
-    if (selfIndex == -1) {
-      throw new SparkException("Self index for " + blockManagerId + " not found")
+  /** Get the list of the peers of the given block manager */
+  private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
+    val blockManagerIds = blockManagerInfo.keySet
+    if (blockManagerIds.contains(blockManagerId)) {
+      blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
+    } else {
+      Seq.empty
     }
-
-    // Note that this logic will select the same node multiple times if there aren't enough peers
-    Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 03ba898..291ddfc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -90,7 +90,7 @@ private[spark] object BlockManagerMessages {
 
   case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
 
-  case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+  case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
 
   case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
 

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 978a6de..acaf321 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
       val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
       assert(statuses.size === 1)
       statuses.head match { case (bm, status) =>
-        assert(bm.executorId === "<driver>", "Block should only be on the driver")
+        assert(bm.isDriver, "Block should only be on the driver")
         assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
         assert(status.memSize > 0, "Block should be in memory store on the driver")
         assert(status.diskSize === 0, "Block should not be in disk store on the driver")

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
new file mode 100644
index 0000000..a7f7c76
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.language.{implicitConversions, postfixOps}
+
+import akka.actor.{ActorSystem, Props}
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.scheduler.LiveListenerBus
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.shuffle.hash.HashShuffleManager
+import org.apache.spark.storage.StorageLevel._
+import org.apache.spark.util.AkkaUtils
+
+/** Testsuite that tests block replication in BlockManager */
+class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAndAfter {
+
+  private val conf = new SparkConf(false)
+  var actorSystem: ActorSystem = null
+  var master: BlockManagerMaster = null
+  val securityMgr = new SecurityManager(conf)
+  val mapOutputTracker = new MapOutputTrackerMaster(conf)
+  val shuffleManager = new HashShuffleManager(conf)
+
+  // List of block manager created during an unit test, so that all of the them can be stopped
+  // after the unit test.
+  val allStores = new ArrayBuffer[BlockManager]
+
+  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+  conf.set("spark.kryoserializer.buffer.mb", "1")
+  val serializer = new KryoSerializer(conf)
+
+  // Implicitly convert strings to BlockIds for test clarity.
+  implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+
+  private def makeBlockManager(maxMem: Long, name: String = "<driver>"): BlockManager = {
+    val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf, securityMgr,
+      mapOutputTracker, shuffleManager)
+    allStores += store
+    store
+  }
+
+  before {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
+      "test", "localhost", 0, conf = conf, securityManager = securityMgr)
+    this.actorSystem = actorSystem
+
+    conf.set("spark.authenticate", "false")
+    conf.set("spark.driver.port", boundPort.toString)
+    conf.set("spark.storage.unrollFraction", "0.4")
+    conf.set("spark.storage.unrollMemoryThreshold", "512")
+
+    // to make a replication attempt to inactive store fail fast
+    conf.set("spark.core.connection.ack.wait.timeout", "1")
+    // to make cached peers refresh frequently
+    conf.set("spark.storage.cachedPeersTtl", "10")
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
+      conf)
+    allStores.clear()
+  }
+
+  after {
+    allStores.foreach { _.stop() }
+    allStores.clear()
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+    actorSystem = null
+    master = null
+  }
+
+
+  test("get peers with addition and removal of block managers") {
+    val numStores = 4
+    val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") }
+    val storeIds = stores.map { _.blockManagerId }.toSet
+    assert(master.getPeers(stores(0).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(0).blockManagerId })
+    assert(master.getPeers(stores(1).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(1).blockManagerId })
+    assert(master.getPeers(stores(2).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(2).blockManagerId })
+
+    // Add driver store and test whether it is filtered out
+    val driverStore = makeBlockManager(1000, "<driver>")
+    assert(master.getPeers(stores(0).blockManagerId).forall(!_.isDriver))
+    assert(master.getPeers(stores(1).blockManagerId).forall(!_.isDriver))
+    assert(master.getPeers(stores(2).blockManagerId).forall(!_.isDriver))
+
+    // Add a new store and test whether get peers returns it
+    val newStore = makeBlockManager(1000, s"store$numStores")
+    assert(master.getPeers(stores(0).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(stores(1).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(stores(2).blockManagerId).toSet ===
+      storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId)
+    assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
+
+    // Remove a store and test whether get peers returns it
+    val storeIdToRemove = stores(0).blockManagerId
+    master.removeExecutor(storeIdToRemove.executorId)
+    assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
+    assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
+    assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
+
+    // Test whether asking for peers of a unregistered block manager id returns empty list
+    assert(master.getPeers(stores(0).blockManagerId).isEmpty)
+    assert(master.getPeers(BlockManagerId("", "", 1, 0)).isEmpty)
+  }
+
+  test("block replication - 2x replication") {
+    testReplication(2,
+      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2)
+    )
+  }
+
+  test("block replication - 3x replication") {
+    // Generate storage levels with 3x replication
+    val storageLevels = {
+      Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
+        level => StorageLevel(
+          level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3)
+      }
+    }
+    testReplication(3, storageLevels)
+  }
+
+  test("block replication - mixed between 1x to 5x") {
+    // Generate storage levels with varying replication
+    val storageLevels = Seq(
+      MEMORY_ONLY,
+      MEMORY_ONLY_SER_2,
+      StorageLevel(true, false, false, false, 3),
+      StorageLevel(true, true, false, true, 4),
+      StorageLevel(true, true, false, false, 5),
+      StorageLevel(true, true, false, true, 4),
+      StorageLevel(true, false, false, false, 3),
+      MEMORY_ONLY_SER_2,
+      MEMORY_ONLY
+    )
+    testReplication(5, storageLevels)
+  }
+
+  test("block replication - 2x replication without peers") {
+    intercept[org.scalatest.exceptions.TestFailedException] {
+      testReplication(1,
+        Seq(StorageLevel.MEMORY_AND_DISK_2, StorageLevel(true, false, false, false, 3)))
+    }
+  }
+
+  test("block replication - deterministic node selection") {
+    val blockSize = 1000
+    val storeSize = 10000
+    val stores = (1 to 5).map {
+      i => makeBlockManager(storeSize, s"store$i")
+    }
+    val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2
+    val storageLevel3x = StorageLevel(true, true, false, true, 3)
+    val storageLevel4x = StorageLevel(true, true, false, true, 4)
+
+    def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
+      stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
+      val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
+      stores.foreach { _.removeBlock(blockId) }
+      master.removeBlock(blockId)
+      locations
+    }
+
+    // Test if two attempts to 2x replication returns same set of locations
+    val a1Locs = putBlockAndGetLocations("a1", storageLevel2x)
+    assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs,
+      "Inserting a 2x replicated block second time gave different locations from the first")
+
+    // Test if two attempts to 3x replication returns same set of locations
+    val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x)
+    assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x,
+      "Inserting a 3x replicated block second time gave different locations from the first")
+
+    // Test if 2x replication of a2 returns a strict subset of the locations of 3x replication
+    val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x)
+    assert(
+      a2Locs2x.subsetOf(a2Locs3x),
+      "Inserting a with 2x replication gave locations that are not a subset of locations" +
+        s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}"
+    )
+
+    // Test if 4x replication of a2 returns a strict superset of the locations of 3x replication
+    val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x)
+    assert(
+      a2Locs3x.subsetOf(a2Locs4x),
+      "Inserting a with 4x replication gave locations that are not a superset of locations " +
+        s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}"
+    )
+
+    // Test if 3x replication of two different blocks gives two different sets of locations
+    val a3Locs3x = putBlockAndGetLocations("a3", storageLevel3x)
+    assert(a3Locs3x !== a2Locs3x, "Two blocks gave same locations with 3x replication")
+  }
+
+  test("block replication - replication failures") {
+    /*
+      Create a system of three block managers / stores. One of them (say, failableStore)
+      cannot receive blocks. So attempts to use that as replication target fails.
+
+            +-----------/fails/-----------> failableStore
+            |
+        normalStore
+            |
+            +-----------/works/-----------> anotherNormalStore
+
+        We are first going to add a normal block manager (i.e. normalStore) and the failable block
+        manager (i.e. failableStore), and test whether 2x replication fails to create two
+        copies of a block. Then we are going to add another normal block manager
+        (i.e., anotherNormalStore), and test that now 2x replication works as the
+        new store will be used for replication.
+     */
+
+    // Add a normal block manager
+    val store = makeBlockManager(10000, "store")
+
+    // Insert a block with 2x replication and return the number of copies of the block
+    def replicateAndGetNumCopies(blockId: String): Int = {
+      store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
+      val numLocations = master.getLocations(blockId).size
+      allStores.foreach { _.removeBlock(blockId) }
+      numLocations
+    }
+
+    // Add a failable block manager with a mock transfer service that does not
+    // allow receiving of blocks. So attempts to use it as a replication target will fail.
+    val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
+      10000, conf, securityMgr, mapOutputTracker, shuffleManager)
+    failableStore.connectionManager.stop()  // To disable any transfer to this store
+    allStores += failableStore // so that this gets stopped after test
+    assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
+
+    // Test that 2x replication fails by creating only one copy of the block
+    assert(replicateAndGetNumCopies("a1") === 1)
+
+    // Add another normal block manager and test that 2x replication works
+    makeBlockManager(10000, "anotherStore")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a2") === 2)
+    }
+  }
+
+  test("block replication - addition and deletion of block managers") {
+    val blockSize = 1000
+    val storeSize = 10000
+    val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
+
+    // Insert a block with given replication factor and return the number of copies of the block\
+    def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
+      val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
+      initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+      val numLocations = master.getLocations(blockId).size
+      allStores.foreach { _.removeBlock(blockId) }
+      numLocations
+    }
+
+    // 2x replication should work, 3x replication should only replicate 2x
+    assert(replicateAndGetNumCopies("a1", 2) === 2)
+    assert(replicateAndGetNumCopies("a2", 3) === 2)
+
+    // Add another store, 3x replication should work now, 4x replication should only replicate 3x
+    val newStore1 = makeBlockManager(storeSize, s"newstore1")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a3", 3) === 3)
+    }
+    assert(replicateAndGetNumCopies("a4", 4) === 3)
+
+    // Add another store, 4x replication should work now
+    val newStore2 = makeBlockManager(storeSize, s"newstore2")
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a5", 4) === 4)
+    }
+
+    // Remove all but the 1st store, 2x replication should fail
+    (initialStores.tail ++ Seq(newStore1, newStore2)).foreach {
+      store =>
+        master.removeExecutor(store.blockManagerId.executorId)
+        store.stop()
+    }
+    assert(replicateAndGetNumCopies("a6", 2) === 1)
+
+    // Add new stores, 3x replication should work
+    val newStores = (3 to 5).map {
+      i => makeBlockManager(storeSize, s"newstore$i")
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      assert(replicateAndGetNumCopies("a7", 3) === 3)
+    }
+  }
+
+  /**
+   * Test replication of blocks with different storage levels (various combinations of
+   * memory, disk & serialization). For each storage level, this function tests every store
+   * whether the block is present and also tests the master whether its knowledge of blocks
+   * is correct. Then it also drops the block from memory of each store (using LRU) and
+   * again checks whether the master's knowledge gets updated.
+   */
+  private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
+    import org.apache.spark.storage.StorageLevel._
+
+    assert(maxReplication > 1,
+      s"Cannot test replication factor $maxReplication")
+
+    // storage levels to test with the given replication factor
+
+    val storeSize = 10000
+    val blockSize = 1000
+
+    // As many stores as the replication factor
+    val stores = (1 to maxReplication).map {
+      i => makeBlockManager(storeSize, s"store$i")
+    }
+
+    storageLevels.foreach { storageLevel =>
+      // Put the block into one of the stores
+      val blockId = new TestBlockId(
+        "block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
+      stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
+
+      // Assert that master know two locations for the block
+      val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
+      assert(blockLocations.size === storageLevel.replication,
+        s"master did not have ${storageLevel.replication} locations for $blockId, " + blockLocations)
+
+      // Test state of the stores that contain the block
+      stores.filter {
+        testStore => blockLocations.contains(testStore.blockManagerId.executorId)
+      }.foreach { testStore =>
+        val testStoreName = testStore.blockManagerId.executorId
+        assert(testStore.getLocal(blockId).isDefined, s"$blockId was not found in $testStoreName")
+        assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
+          s"master does not have status for ${blockId.name} in $testStoreName")
+
+        val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
+
+        // Assert that block status in the master for this store has expected storage level
+        assert(
+          blockStatus.storageLevel.useDisk === storageLevel.useDisk &&
+            blockStatus.storageLevel.useMemory === storageLevel.useMemory &&
+            blockStatus.storageLevel.useOffHeap === storageLevel.useOffHeap &&
+            blockStatus.storageLevel.deserialized === storageLevel.deserialized,
+          s"master does not know correct storage level for ${blockId.name} in $testStoreName")
+
+        // Assert that the block status in the master for this store has correct memory usage info
+        assert(!blockStatus.storageLevel.useMemory || blockStatus.memSize >= blockSize,
+          s"master does not know size of ${blockId.name} stored in memory of $testStoreName")
+
+
+        // If the block is supposed to be in memory, then drop the copy of the block in
+        // this store test whether master is updated with zero memory usage this store
+        if (storageLevel.useMemory) {
+          // Force the block to be dropped by adding a number of dummy blocks
+          (1 to 10).foreach {
+            i =>
+              testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
+          }
+          (1 to 10).foreach {
+            i => testStore.removeBlock(s"dummy-block-$i")
+          }
+
+          val newBlockStatusOption = master.getBlockStatus(blockId).get(testStore.blockManagerId)
+
+          // Assert that the block status in the master either does not exist (block removed
+          // from every store) or has zero memory usage for this store
+          assert(
+            newBlockStatusOption.isEmpty || newBlockStatusOption.get.memSize === 0,
+            s"after dropping, master does not know size of ${blockId.name} " +
+              s"stored in memory of $testStoreName"
+          )
+        }
+
+        // If the block is supposed to be in disk (after dropping or otherwise, then
+        // test whether master has correct disk usage for this store
+        if (storageLevel.useDisk) {
+          assert(master.getBlockStatus(blockId)(testStore.blockManagerId).diskSize >= blockSize,
+            s"after dropping, master does not know size of ${blockId.name} " +
+              s"stored in disk of $testStoreName"
+          )
+        }
+      }
+      master.removeBlock(blockId)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/be0cc995/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f32ce6f..48c45bf 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -189,7 +189,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store = makeBlockManager(2000, "exec1")
     store2 = makeBlockManager(2000, "exec2")
 
-    val peers = master.getPeers(store.blockManagerId, 1)
+    val peers = master.getPeers(store.blockManagerId)
     assert(peers.size === 1, "master did not return the other manager as a peer")
     assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
 
@@ -448,7 +448,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     val list2DiskGet = store.get("list2disk")
     assert(list2DiskGet.isDefined, "list2memory expected to be in store")
     assert(list2DiskGet.get.data.size === 3)
-    System.out.println(list2DiskGet)
     // We don't know the exact size of the data on disk, but it should certainly be > 0.
     assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
     assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org