You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by shubhamchopra <gi...@git.apache.org> on 2016/05/17 18:24:31 UTC

[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

GitHub user shubhamchopra opened a pull request:

    https://github.com/apache/spark/pull/13152

    [SPARK-15353] [CORE] Making peer selection for block replication pluggable

    ## What changes were proposed in this pull request?
    
    This PR makes block replication strategies pluggable. It provides two trait that can be implemented, one that maps a host to its topology and is used in the master, and the second that helps prioritize a list of peers for block replication and would run in the executors.
    
    This patch contains default implementations of these traits that make sure current Spark behavior is unchanged.
    
    
    ## How was this patch tested?
    
    This patch should not change Spark behavior in any way, and was tested with unit tests for storage.
    
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shubhamchopra/spark RackAwareBlockReplication

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/13152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #13152
    
----
commit 779ce27dbeedd4d5c72e28782c9d38af51d2060c
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-05T22:06:14Z

    Adding capability to prioritize peer executors based on rack awareness while replicating blocks.

commit d0b6747f1fc9a0b701ab41fe5cf67939ed36cb9e
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-06T17:40:47Z

    Minor modifications to get past the style check errors.

commit 942908ac060fbdd29d0efd1f8541436bf9cd46d8
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-06T20:31:22Z

    Using blockId hashcode as a source of randomness, so we don't keep choosing the same peers for replication.

commit 0902e39fc7a2526539013e67c48bc13b6991bf07
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-09T20:36:53Z

    Several changes:
    1. Adding rack attribute to hashcode and equals to block manager id.
    2. Removing boolean check for rack awareness. Asking master for rack info, and master uses topology mapper.
    3. Adding a topology mapper trait and a default implementation that block manager master endpoint uses to discern topology information.

commit 86e1e0212b0dae0d598f0128c6a7b8f33429dc27
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-09T20:58:21Z

    Adding null check so a Block Manager can be initiaziled without the master.

commit a3b50ae9bcca7e871d384fa4614b2c77ac5ff5ad
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-12T21:09:16Z

    Renaming classes/variables from rack to a more general topology.

commit 1ee7948ce3994df08119418b779f8cc2e5aaca86
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-12T21:15:46Z

    Renaming classes/variables from rack to a more general topology.

commit 8de5c6e39cd0a868094803a0f53b3b50b7ed90d5
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-12T21:27:29Z

    We continue to randomly choose peers, so there is no change in current behavior.

commit 72ae37d64724423c65d3a23559a5f46649ffa4c3
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-13T15:36:17Z

    Spelling correction and minor changes in comments to use a more general topology instead of rack.

commit e071ca3a838193efad715764cc654507ee254e44
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-13T20:32:13Z

    Minor change. Changing replication info message to debug level.

commit 96aaf6ec50ae943c1345966cfc11fd4180ddfa3a
Author: Shubham Chopra <sc...@bloomberg.net>
Date:   2016-05-16T21:47:33Z

    Providing peersReplicateTo to the prioritizer.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170799
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   *         This information can be used when choosing executors for block replication
    +   *         to discern executors from a different rack than a candidate executor, for example.
    +   *
    +   *         An implementation can choose to use empty strings or None in case topology info
    +   *         is not available. This would imply that all such executors belong to the same rack.
    +   */
    +  def getTopologyForHost(hostname: String): Option[String]
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): Option[String] = {
    +    logDebug(s"Got a request for $hostname")
    +    Some("DefaultRack")
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyFile`. To use this topology mapper, set the
    + * `spark.replication.topologyawareness.topologyMapper` property to
    + * [[org.apache.spark.storage.FileBasedTopologyMapper]]
    + * @param conf SparkConf object
    + */
    +@DeveloperApi
    +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    --- End diff --
    
    can we add a unit test for FileBasedTopologyMapper itself?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615714
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(blockManagerId: BlockManagerId,
    --- End diff --
    
    style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on the pull request:

    https://github.com/apache/spark/pull/13152#issuecomment-220156087
  
    Fixed style issues pointed out by @HyukjinKwon 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161937
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(!(numFailures > maxReplicationFailures
    +          || peersForReplication.isEmpty
    +          || peersReplicatedTo.size == numPeersToReplicateTo)) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +          peersFailedToReplicateTo += peer
    +          // we have a failed replication, so we get the list of peers again
    +          // we don't want peers we have already replicated to and the ones that
    +          // have failed previously
    +          val filteredPeers = getPeers(true).filter { p =>
    +            !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p))
    --- End diff --
    
    ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    The topology info is only queried when the executor initiates and is assumed to stay the same throughout the life of the executor. Depending on the cluster manager being used, I am assuming the exact way this information is provided may differ. Resolving this at the master makes this implementation simpler as only the master needs to be able to access the service/script/class being used to resolve the topology. The communication overhead is minimal as the executors do have to communicate with the master when they initiate anyways.
    
    The getRandomPeer() method was doing quite a bit more than just getting a random peer. It was being used to manage/mutate state, which was being mutated in other places as well. I tried to keep the block placement strategy and the usage of its output separate, to make it simpler to provide a new block placement strategy. I also thought it would be best to de-couple any internal replication state management with the block replication strategy, while still keeping the structure of the state the same.
    
    The costlier operation here is the RPC fetch of all the peers from the master. The prioritization algorithm is only called once if there are no failures.  If there are failures, the list of peers is requested from the master again, before the prioritizer is run. The bigger hit again, would be the RPC communication between the executor and the master. Random.shuffle in the default prioritizer uses Fisher-Yates shuffle, so is linear in time. 
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636294
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.annotation.DeveloperApi
    +
    +import scala.util.Random
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(peers: Seq[BlockManagerId],
    +                 peersReplicatedTo: Set[BlockManagerId],
    +                 blockId: BlockId): Seq[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class DefaultBlockReplicationPrioritization(host: String)
    +  extends BlockReplicationPrioritization
    +  with Logging {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
    +   * that just makes sure we put blocks on different hosts, if possible
    +   *
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  override def prioritize(peers: Seq[BlockManagerId],
    +                          peersReplicatedTo: Set[BlockManagerId],
    +                          blockId: BlockId): Seq[BlockManagerId] = {
    --- End diff --
    
    Here too..
    
    ```scala
    override def prioritize(peers: Seq[BlockManagerId],
      peersReplicatedTo: Set[BlockManagerId],
      blockId: BlockId): Seq[BlockManagerId] = {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73751665
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(!(numFailures > maxReplicationFailures
    +          || peersForReplication.isEmpty
    +          || peersReplicatedTo.size == numPeersToReplicateTo)) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +          peersFailedToReplicateTo += peer
    +          // we have a failed replication, so we get the list of peers again
    +          // we don't want peers we have already replicated to and the ones that
    +          // have failed previously
    +          val filteredPeers = getPeers(true).filter { p =>
    +            !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p))
    --- End diff --
    
    Fixed this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426070
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.io.{File, FileInputStream}
    +import java.util.Properties
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    --- End diff --
    
    can you document what an empty string means?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/13152#issuecomment-219809650
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73614814
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(!(numFailures > maxReplicationFailures
    +          || peersForReplication.isEmpty
    +          || peersReplicatedTo.size == numPeersToReplicateTo)) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +          peersFailedToReplicateTo += peer
    +          // we have a failed replication, so we get the list of peers again
    +          // we don't want peers we have already replicated to and the ones that
    +          // have failed previously
    +          val filteredPeers = getPeers(true).filter { p =>
    +            !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p))
    --- End diff --
    
    nit: seems a little easier to read if it was !a && !b


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426300
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -160,8 +163,25 @@ private[spark] class BlockManager(
         blockTransferService.init(this)
         shuffleClient.init(appId)
     
    -    blockManagerId = BlockManagerId(
    -      executorId, blockTransferService.hostName, blockTransferService.port)
    +    blockReplicationPrioritizer = {
    +      val priorityClass = conf.get(
    +        "spark.replication.topologyawareness.prioritizer",
    +        "org.apache.spark.storage.DefaultBlockReplicationPrioritization")
    +      val clazz = Utils.classForName(priorityClass)
    +      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
    +      logInfo(s"Using $priorityClass for prioritizing peers")
    --- End diff --
    
    ```
    Using $priorityClass for block replication policy
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    Rebased to master to resolve merge conflicts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426587
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    --- End diff --
    
    can we just name this BlockReplicationPolicy?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161348
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -126,6 +138,10 @@ class BlockManagerMasterEndpoint(
           }
       }
     
    +  private def getTopologyInfoForHost(host: String): String = {
    --- End diff --
    
    inline?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426940
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    --- End diff --
    
    can you make it explicit to use a mutable HashSet here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161980
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures
    +          && !peersForReplication.isEmpty
    +          && peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    --- End diff --
    
    don't need this comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636336
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    --- End diff --
    
    It seems this indentation might not have to be changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170841
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPolicy {
    --- End diff --
    
    can we create a BlockReplicationPolicySuite and add unit tests for RandomBlockReplicationPolicy? You'd want to verify it is returning you the right number of peers and make sure they don't duplicate each other ...



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161947
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures
    +          && !peersForReplication.isEmpty
    +          && peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    --- End diff --
    
    extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75427102
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures &&
    +      !peersForReplication.isEmpty &&
    +      peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    --- End diff --
    
    one danger with the head/tail operation here is that it is possible for this to have very bad performance if the underlying data structures returned by prioritize is expensive to do these kind of operations. Should we make it return either a List (then you are fine with head/tail), or an Array (then you can just use indexes)?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74634331
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(!(numFailures > maxReplicationFailures
    +          || peersForReplication.isEmpty
    +          || peersReplicatedTo.size == numPeersToReplicateTo)) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +          peersFailedToReplicateTo += peer
    +          // we have a failed replication, so we get the list of peers again
    +          // we don't want peers we have already replicated to and the ones that
    +          // have failed previously
    +          val filteredPeers = getPeers(true).filter { p =>
    +            !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p))
    --- End diff --
    
    Still seems the same


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615313
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---
    @@ -103,8 +121,11 @@ private[spark] object BlockManagerId {
        * @param port Port of the block manager.
        * @return A new [[org.apache.spark.storage.BlockManagerId]].
        */
    -  def apply(execId: String, host: String, port: Int): BlockManagerId =
    -    getCachedBlockManagerId(new BlockManagerId(execId, host, port))
    +  def apply(execId: String,
    --- End diff --
    
    The style here is inconsistent, can you move execId to the next line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426951
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    --- End diff --
    
    for both peersReplicatedTo and peersFailedToReplicateTo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170404
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPolicy {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @param numPeersToReplicateTo Number of peers we need to replicate to
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority.
    +   *         This returns a list of size at most `numPeersToReplicateTo`.
    +   */
    +  def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numPeersToReplicateTo: Int): List[BlockManagerId]
    --- End diff --
    
    numReplicas?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170909
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPolicy {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @param numPeersToReplicateTo Number of peers we need to replicate to
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority.
    +   *         This returns a list of size at most `numPeersToReplicateTo`.
    +   */
    +  def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numPeersToReplicateTo: Int): List[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class RandomBlockReplicationPolicy
    +  extends BlockReplicationPolicy
    +  with Logging {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
    +   * that just makes sure we put blocks on different hosts, if possible
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  override def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numReplicas: Int): List[BlockManagerId] = {
    +    val random = new Random(blockId.hashCode)
    +    logDebug(s"Input peers : ${peers.mkString(", ")}")
    +    val prioritizedPeers = if (peers.size > numReplicas) {
    +      getSampleIds(peers.size, numReplicas, random).map(peers(_))
    +    } else {
    +      if (peers.size < numReplicas) {
    +        logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
    +      }
    +      random.shuffle(peers).toList
    +    }
    +    logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
    +    prioritizedPeers
    +  }
    +
    +  /**
    +   * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
    +   * minimizing space usage
    +   * [[http://math.stackexchange.com/questions/178690/
    +   * whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin]]
    +   *
    +   * @param n total number of indices
    +   * @param m number of samples needed
    +   * @param r random number generator
    +   * @return list of m random unique indices
    +   */
    +  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
    +    val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (s, i) =>
    --- End diff --
    
    this is a bit difficult to read (actually very rarely do foldLeft make code easy to read in my experience).
    
    Perhaps just write it in a more imperative way?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76666684
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.collection.mutable
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPolicy {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @param numPeersToReplicateTo Number of peers we need to replicate to
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority.
    +   *         This returns a list of size at most `numPeersToReplicateTo`.
    +   */
    +  def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numPeersToReplicateTo: Int): List[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class RandomBlockReplicationPolicy
    +  extends BlockReplicationPolicy
    +  with Logging {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
    +   * that just makes sure we put blocks on different hosts, if possible
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  override def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numReplicas: Int): List[BlockManagerId] = {
    +    val random = new Random(blockId.hashCode)
    +    logDebug(s"Input peers : ${peers.mkString(", ")}")
    +    val prioritizedPeers = if (peers.size > numReplicas) {
    +      getSampleIds(peers.size, numReplicas, random).map(peers(_))
    +    } else {
    +      if (peers.size < numReplicas) {
    +        logWarning(s"Expecting ${numReplicas} replicas with only ${peers.size} peer/s.")
    +      }
    +      random.shuffle(peers).toList
    +    }
    +    logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
    +    prioritizedPeers
    +  }
    +
    +  /**
    +   * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
    +   * minimizing space usage
    +   * [[http://math.stackexchange.com/questions/178690/
    +   * whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin]]
    +   *
    +   * @param n total number of indices
    +   * @param m number of samples needed
    +   * @param r random number generator
    +   * @return list of m random unique indices
    +   */
    +  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
    +    val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (s, i) =>
    --- End diff --
    
    I changed the variable names a little, to make it more clear that the left argument is the set that we are accumulating on.
    This is just a 2 line usage, I am fine either ways. Using folds, though, is a more general and the recommended idiomatic way of writing accumulating loops of this kind in Scala. In my experience, I have found imperative code to be a lot more error prone and harder to reason about. I generally stick with idiomatic Scala, unless practicality dictates otherwise. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615717
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,76 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(blockManagerId: BlockManagerId,
    +    peers: Seq[BlockManagerId],
    +    peersReplicatedTo: Set[BlockManagerId],
    +    blockId: BlockId): Seq[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class DefaultBlockReplicationPrioritization
    +  extends BlockReplicationPrioritization
    +  with Logging {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
    +   * that just makes sure we put blocks on different hosts, if possible
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  override def prioritize(blockManagerId: BlockManagerId,
    --- End diff --
    
    style


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636922
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    -    val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    +    val numPeersToReplicateTo = level.replication - 1
    +
    +    @tailrec def replicateBlock(
    +      numFailures: Int,
    +      peersForReplication: Seq[BlockManagerId],
    +      peersReplicatedTo: Set[BlockManagerId],
    +      peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = {
    +
    +      if (numFailures > maxReplicationFailures
    +                || peersForReplication.isEmpty
    +                || peersReplicatedTo.size == numPeersToReplicateTo) {
    +        // This selection of a peer and replication is continued in a loop until one of the
    +        // following 3 conditions is fulfilled:
    +        // (i) specified number of peers have been replicated to
    +        // (ii) too many failures in replicating to peers
    +        // (iii) no peer left to replicate to
    +        peersReplicatedTo
    +      } else {
    +        val peer = peersForReplication.head
    +        val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try {
    +          val onePeerStartTime = System.currentTimeMillis
    +          logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +          blockTransferService.uploadBlockSync(
    +            peer.host,
    +            peer.port,
    +            peer.executorId,
    +            blockId,
    +            new NettyManagedBuffer(data.toNetty),
    +            tLevel,
    +            classTag)
    +          logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +            s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +          // the block was replicated, lets update state and move ahead
    +          (numFailures,
    +            peersForReplication.tail,
    +            peersReplicatedTo + peer,
    +            peersFailedToReplicateTo)
    +        } catch {
    +          case e: Exception =>
    +            logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +            val updatedFailedPeers = peersFailedToReplicateTo + peer
    +            // we have a failed replication, so we get the list of peers again
    +            // we don't want peers we have already replicated to and the ones that
    +            // have failed previously
    +            val filteredPeers = getPeers(true).filter{p =>
    +              !(updatedFailedPeers.contains(p) || peersReplicatedTo.contains(p))
                 }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    -          }
    -        case None => // no peer left to replicate to
    -          done = true
    +            val updatedPeers =
    +              blockReplicationPrioritizer.prioritize(filteredPeers, peersReplicatedTo, blockId)
    +            (numFailures + 1, updatedPeers, peersReplicatedTo, updatedFailedPeers)
    +        }
    +
    +        replicateBlock(updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers)
           }
         }
    -    val timeTakeMs = (System.currentTimeMillis - startTime)
    +
    +    val startTime = System.currentTimeMillis
    +    val peersReplicatedTo = replicateBlock(0,
    +      blockReplicationPrioritizer.prioritize(getPeers(false), Set.empty, blockId),
    +      Set.empty,
    +      Set.empty)
    --- End diff --
    
    Maybe this should be as below:
    
    ```scala
    replicateBlock(
      0,
      blockReplicationPrioritizer.prioritize(getPeers(false), Set.empty, blockId),
      Set.empty,
      Set.empty)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    Thanks for the suggestions. I have corrected the style check errors and verified that locally, so hopefully there are not more style errors. I have also done a couple of modifications per your suggestions:
    1. Added TopologyMapperSuite and BlockReplicationPolicySuite for unit tests on these classes.
    2. Changed config parameters names as suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3232 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3232/consoleFull)** for PR 13152 at commit [`a0ed1e6`](https://github.com/apache/spark/commit/a0ed1e648a343f123bc41c1bdb94766b5a1e7938).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3225 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3225/consoleFull)** for PR 13152 at commit [`9b8ce32`](https://github.com/apache/spark/commit/9b8ce3229d0cff64e77d55563cec3cc3cda29182).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170624
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   *         This information can be used when choosing executors for block replication
    +   *         to discern executors from a different rack than a candidate executor, for example.
    +   *
    +   *         An implementation can choose to use empty strings or None in case topology info
    +   *         is not available. This would imply that all such executors belong to the same rack.
    +   */
    +  def getTopologyForHost(hostname: String): Option[String]
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): Option[String] = {
    +    logDebug(s"Got a request for $hostname")
    +    Some("DefaultRack")
    --- End diff --
    
    should this return None instead?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426435
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
         ).map(_.flatten.toSeq)
       }
     
    -  private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    +  private def register(dummyId: BlockManagerId,
    +    maxMemSize: Long,
    --- End diff --
    
    also instead of dummyId, I'd call it "idWithoutTopologyInfo"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636200
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -124,6 +138,13 @@ class BlockManagerMasterEndpoint(
               }
             case None => context.reply(false)
           }
    +
    +    case GetTopologyInfo(host) => context.reply(getTopologyInfoForHost(host))
    +
    --- End diff --
    
    Maybe remove a newline here..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426791
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures &&
    +      !peersForReplication.isEmpty &&
    +      peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    --- End diff --
    
    this should be error level log, shouldn't it?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74650320
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(!(numFailures > maxReplicationFailures
    +          || peersForReplication.isEmpty
    +          || peersReplicatedTo.size == numPeersToReplicateTo)) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        // the block was replicated, lets update state and move ahead
    +
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    +          logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +          peersFailedToReplicateTo += peer
    +          // we have a failed replication, so we get the list of peers again
    +          // we don't want peers we have already replicated to and the ones that
    +          // have failed previously
    +          val filteredPeers = getPeers(true).filter { p =>
    +            !(peersFailedToReplicateTo.contains(p) || peersReplicatedTo.contains(p))
    --- End diff --
    
    My bad, the latest commit fixed this. The while loop also had a similar condition and had fixed that earlier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426567
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    --- End diff --
    
    remove these params unless you really are going to document them.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170578
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   *         This information can be used when choosing executors for block replication
    +   *         to discern executors from a different rack than a candidate executor, for example.
    +   *
    +   *         An implementation can choose to use empty strings or None in case topology info
    +   *         is not available. This would imply that all such executors belong to the same rack.
    +   */
    +  def getTopologyForHost(hostname: String): Option[String]
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): Option[String] = {
    +    logDebug(s"Got a request for $hostname")
    +    Some("DefaultRack")
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyFile`. To use this topology mapper, set the
    + * `spark.replication.topologyawareness.topologyMapper` property to
    + * [[org.apache.spark.storage.FileBasedTopologyMapper]]
    + * @param conf SparkConf object
    + */
    +@DeveloperApi
    +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile")
    --- End diff --
    
    I was looking at existing configs - can we change the config to
    ```
    spark.storage.replication.topologyFile
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615368
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
    @@ -50,12 +50,20 @@ class BlockManagerMaster(
         logInfo("Removal of executor " + execId + " requested")
       }
     
    -  /** Register the BlockManager's id with the driver. */
    +  /** Register the BlockManager's id with the driver. The input BlockManagerId does not contain
    +   * topology information. This information is obtained from the master and we respond with an
    +   * updated BlockManagerId fleshed out with this information.
    +   *
    +   */
       def registerBlockManager(
    -      blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
    +                            blockManagerId: BlockManagerId,
    --- End diff --
    
    There is also a style issue here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426605
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -20,6 +20,7 @@ package org.apache.spark.storage
     import java.io._
     import java.nio.ByteBuffer
     
    +import scala.annotation.tailrec
    --- End diff --
    
    is this used anywhere?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3291 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3291/consoleFull)** for PR 13152 at commit [`632d043`](https://github.com/apache/spark/commit/632d0436ca701031e03bd141b95d4b0bb5544150).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426412
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
         ).map(_.flatten.toSeq)
       }
     
    -  private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    +  private def register(dummyId: BlockManagerId,
    +    maxMemSize: Long,
    --- End diff --
    
    4 space indent, and put all the arguments on its own line, e.g.
    ```
    private def register(
        dummyId: BlockManagerId,
        maxMemSize: Long,
        slaveEndpoint: RpcEndpointRef): BlockManagerId = {
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426231
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(
    +    blockManagerId: BlockManagerId,
    +    peers: Seq[BlockManagerId],
    --- End diff --
    
    also rather than a full prioritization, can we also pass in a number of replicas wanted and just return a list there?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636245
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.annotation.DeveloperApi
    +
    +import scala.util.Random
    +import org.apache.spark.internal.Logging
    --- End diff --
    
    Maybe reorder imports.. (See [scala-style-guide#imports](https://github.com/databricks/scala-style-guide#imports))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161999
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,88 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures
    --- End diff --
    
    The indentation still looks funny here. Should the && be on the trailing part of the line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74634093
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -138,9 +138,7 @@ class BlockManagerMasterEndpoint(
           }
       }
     
    -  private def getTopologyInfoForHost(host: String): String = {
    -    topologyMapper.getTopologyForHost(host)
    -  }
    +  private def getTopologyInfoForHost(h: String): String = topologyMapper.getTopologyForHost(h)
    --- End diff --
    
    I meant inline the function into the caller. It only seems to be used in one place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    still lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636461
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    -    val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    +    val numPeersToReplicateTo = level.replication - 1
    +
    +    @tailrec def replicateBlock(
    +      numFailures: Int,
    +      peersForReplication: Seq[BlockManagerId],
    +      peersReplicatedTo: Set[BlockManagerId],
    +      peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = {
    +
    +      if (numFailures > maxReplicationFailures
    +                || peersForReplication.isEmpty
    +                || peersReplicatedTo.size == numPeersToReplicateTo) {
    --- End diff --
    
    Although I cannot find a documentation for indentation about this, it might be better be consistent because I see you added
    
    ```scala
    executorId == id.executorId &&
      port == id.port &&
      host == id.host &&
      topologyInfo == id.topologyInfo
    ```
    
    So this might have to be
    
    ```scala
    if (numFailures > maxReplicationFailures
        || peersForReplication.isEmpty
        || peersReplicatedTo.size == numPeersToReplicateTo) {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    You wouldn't have to create a new selector after a failure. That case can be detected by checking if the number of failed replications has increased, e.g. `if (failedReplications.length > prevNumFails) { reprioritize... }`. Basically, that state would be tracked in the selector, instead of BlockManager.
    
    I think the main benefit here is that the interface is more flexible as a developer facing API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170720
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   *         This information can be used when choosing executors for block replication
    +   *         to discern executors from a different rack than a candidate executor, for example.
    +   *
    +   *         An implementation can choose to use empty strings or None in case topology info
    +   *         is not available. This would imply that all such executors belong to the same rack.
    +   */
    +  def getTopologyForHost(hostname: String): Option[String]
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    --- End diff --
    
    add a simple classdoc here - something like "A TopologyMapper that assumes all nodes are in the same rack" 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426552
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    --- End diff --
    
    reset the change here - use 4 space indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636132
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---
    @@ -103,8 +121,11 @@ private[spark] object BlockManagerId {
        * @param port Port of the block manager.
        * @return A new [[org.apache.spark.storage.BlockManagerId]].
        */
    -  def apply(execId: String, host: String, port: Int): BlockManagerId =
    -    getCachedBlockManagerId(new BlockManagerId(execId, host, port))
    +  def apply(execId: String,
    +            host: String,
    +            port: Int,
    +            topologyInfo: Option[String] = None): BlockManagerId =
    --- End diff --
    
    Maybe we need to correct indentation as below..
    
    ```scala
    def apply(execId: String,
      host: String,
      port: Int,
      topologyInfo: Option[String] = None): BlockManagerId =
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    Merging in master.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426189
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(
    +    blockManagerId: BlockManagerId,
    +    peers: Seq[BlockManagerId],
    +    peersReplicatedTo: Set[BlockManagerId],
    +    blockId: BlockId): Seq[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class DefaultBlockReplicationPrioritization
    --- End diff --
    
    instead of Default, I'd call this RandomBlockReplicationPrioritization to better reflect what it does.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3232 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3232/consoleFull)** for PR 13152 at commit [`a0ed1e6`](https://github.com/apache/spark/commit/a0ed1e648a343f123bc41c1bdb94766b5a1e7938).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170505
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---
    @@ -347,6 +347,47 @@ class BlockManagerReplicationSuite extends SparkFunSuite
       }
     
       /**
    +   * Test if we get the required number of peers when using random sampling from
    +   * RandomBlockReplicationPolicy
    +   */
    +  test(s"block replication - random block replication policy") {
    +    val numBlockManagers = 10
    +    val storeSize = 1000
    +    val blockManagers = (1 to numBlockManagers).map { i =>
    +      BlockManagerId(s"store-$i", "localhost", 1000 + i, None)
    +    }
    +    val candidateBlockManager = BlockManagerId("test-store", "localhost", 1000, None)
    +    val replicationPolicy = new RandomBlockReplicationPolicy
    +    val blockId = "test-block"
    +
    +    (1 to 10).foreach {numReplicas =>
    +      logInfo(s"Num replicas : $numReplicas")
    --- End diff --
    
    are these logging information useful in tests?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615218
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
    @@ -50,12 +50,20 @@ class BlockManagerMaster(
         logInfo("Removal of executor " + execId + " requested")
       }
     
    -  /** Register the BlockManager's id with the driver. */
    +  /** Register the BlockManager's id with the driver. The input BlockManagerId does not contain
    +   * topology information. This information is obtained from the master and we respond with an
    +   * updated BlockManagerId fleshed out with this information.
    +   *
    --- End diff --
    
    extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    The state being managed inside getRandomPeer() is also modified in a couple of other places, so it won't be a very clean change to remove some of it out of getRandomPeer. Even if that is done, I agree that your approach would only mean calling getNextPeer. It would however mean adding adding more state to ensure expected behavior in cases where block replication fails on a peer. 
    
    I am flexible about the implementation choices, so can do the modifications if needed. Just to clarify on the motivation of this interface, I have another PR  [SPARK-15354](https://github.com/apache/spark/pull/13932) that shows a couple of prioritizers that I intend to add (including a simple one that replicates HDFS's block replication strategy). Note that in case of failures, the list of peers is requested from the master afresh and is optimized over again and with this interface. Let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73593762
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -159,8 +162,27 @@ private[spark] class BlockManager(
         blockTransferService.init(this)
         shuffleClient.init(appId)
     
    +    val topologyInfo = {
    +      val topologyStr = master.getTopologyInfo(blockTransferService.hostName)
    +      if (topologyStr == null || topologyStr.isEmpty) {
    +        None
    +      } else {
    +        Some(topologyStr)
    +      }
    +    }
    +
    +    blockReplicationPrioritizer = {
    +      val priorityClass = conf.get(
    +        "spark.replication.topologyawareness.prioritizer",
    +        "org.apache.spark.storage.DefaultBlockReplicationPrioritization")
    +      val clazz = Utils.classForName(priorityClass)
    +      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
    +      logInfo(s"Using $priorityClass for prioritizing peers")
    +      ret
    +    }
    +
         blockManagerId = BlockManagerId(
    -      executorId, blockTransferService.hostName, blockTransferService.port)
    +      executorId, blockTransferService.hostName, blockTransferService.port, topologyInfo)
    --- End diff --
    
    I implemented this in the commits below. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615629
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala ---
    @@ -111,4 +111,6 @@ private[spark] object BlockManagerMessages {
       case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
     
       case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
    +
    +  case class GetTopologyInfo(host: String) extends ToBlockManagerMaster
    --- End diff --
    
    Is this message still used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    LGTM - sorry that this has taken a while. I will merge once tests pass.
    
    Also cc @zsxwing for his attention.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636272
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.annotation.DeveloperApi
    +
    +import scala.util.Random
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(peers: Seq[BlockManagerId],
    +                 peersReplicatedTo: Set[BlockManagerId],
    +                 blockId: BlockId): Seq[BlockManagerId]
    --- End diff --
    
    Maybe indentation as below:
    
    ```scala 
    def prioritize(peers: Seq[BlockManagerId],
      peersReplicatedTo: Set[BlockManagerId],
      blockId: BlockId): Seq[BlockManagerId]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636534
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    -    val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    +    val numPeersToReplicateTo = level.replication - 1
    +
    +    @tailrec def replicateBlock(
    +      numFailures: Int,
    +      peersForReplication: Seq[BlockManagerId],
    +      peersReplicatedTo: Set[BlockManagerId],
    +      peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = {
    +
    +      if (numFailures > maxReplicationFailures
    +                || peersForReplication.isEmpty
    +                || peersReplicatedTo.size == numPeersToReplicateTo) {
    +        // This selection of a peer and replication is continued in a loop until one of the
    +        // following 3 conditions is fulfilled:
    +        // (i) specified number of peers have been replicated to
    +        // (ii) too many failures in replicating to peers
    +        // (iii) no peer left to replicate to
    +        peersReplicatedTo
    +      } else {
    +        val peer = peersForReplication.head
    +        val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try {
    +          val onePeerStartTime = System.currentTimeMillis
    +          logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +          blockTransferService.uploadBlockSync(
    +            peer.host,
    +            peer.port,
    +            peer.executorId,
    +            blockId,
    +            new NettyManagedBuffer(data.toNetty),
    +            tLevel,
    +            classTag)
    +          logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +            s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +          // the block was replicated, lets update state and move ahead
    +          (numFailures,
    +            peersForReplication.tail,
    +            peersReplicatedTo + peer,
    +            peersFailedToReplicateTo)
    +        } catch {
    +          case e: Exception =>
    +            logWarning(s"Failed to replicate $blockId to $peer, failure #$numFailures", e)
    +            val updatedFailedPeers = peersFailedToReplicateTo + peer
    +            // we have a failed replication, so we get the list of peers again
    +            // we don't want peers we have already replicated to and the ones that
    +            // have failed previously
    +            val filteredPeers = getPeers(true).filter{p =>
    --- End diff --
    
    Maybe as below,  (there is a good exmaple here, [scala-style-guide#pattern-matching](https://github.com/databricks/scala-style-guide#pattern-matching))
    ```scala
    val filteredPeers = getPeers(true).filter { p =>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161892
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.io.{File, FileInputStream}
    +import java.util.Properties
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   */
    +  def getTopologyForHost(hostname: String): String
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): String = {
    +    logDebug(s"Got a request for $hostname")
    +    "DefaultRack"
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyfile`. To use this topology mapper, set the
    --- End diff --
    
    s/topologyfile/topologyFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    Thanks - this looks pretty good!
    
    I've triggered a new Jenkins run and also left some small comments. It would be great to add some unit tests (not integration tests) for two of the classes introduced.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170783
  
    --- Diff: core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---
    @@ -37,6 +35,8 @@ import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
     import org.apache.spark.shuffle.sort.SortShuffleManager
     import org.apache.spark.storage.StorageLevel._
     
    +import scala.collection.mutable
    --- End diff --
    
    i think this will fail the style checker for import order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426476
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---
    @@ -101,10 +117,18 @@ private[spark] object BlockManagerId {
        * @param execId ID of the executor.
        * @param host Host name of the block manager.
        * @param port Port of the block manager.
    +   * @param topologyInfo topology information for the blockmanager, if available
    +   *                     This can be network topology information for use while choosing peers
    +   *                     while replicating data blocks. More information available here:
    +   *                     [[org.apache.spark.storage.TopologyMapper]]
        * @return A new [[org.apache.spark.storage.BlockManagerId]].
        */
    -  def apply(execId: String, host: String, port: Int): BlockManagerId =
    -    getCachedBlockManagerId(new BlockManagerId(execId, host, port))
    +  def apply(
    +    execId: String,
    --- End diff --
    
    4 space indent here too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426199
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(
    +    blockManagerId: BlockManagerId,
    +    peers: Seq[BlockManagerId],
    --- End diff --
    
    is passing in all the peers a performance concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426147
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,80 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks. BlockManager will replicate to each peer returned in order until the
    + * desired replication order is reached. If a replication fails, prioritize() will be called
    + * again to get a fresh prioritization.
    + */
    +@DeveloperApi
    +trait BlockReplicationPrioritization {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  def prioritize(
    +    blockManagerId: BlockManagerId,
    +    peers: Seq[BlockManagerId],
    +    peersReplicatedTo: Set[BlockManagerId],
    +    blockId: BlockId): Seq[BlockManagerId]
    +}
    +
    +@DeveloperApi
    +class DefaultBlockReplicationPrioritization
    +  extends BlockReplicationPrioritization
    +  with Logging {
    +
    +  /**
    +   * Method to prioritize a bunch of candidate peers of a block. This is a basic implementation,
    +   * that just makes sure we put blocks on different hosts, if possible
    +   *
    +   * @param blockManagerId Id of the current BlockManager for self identification
    +   * @param peers A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId BlockId of the block being replicated. This can be used as a source of
    +   *                randomness if needed.
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
    +   */
    +  override def prioritize(
    +    blockManagerId: BlockManagerId,
    --- End diff --
    
    so the Spark style for indentation is to have 4 spaces for function arguments, i.e.
    ```scala
    override def prioritize(
        blockManagerId: BlockManagerId,,
        peers: Seq[BlockManagerId],
        peersReplicatedTo: Set[BlockManagerId],
        blockId: BlockId): Seq[BlockManagerId] = {
      val random = new Random(blockId.hashCode)
      ...
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426383
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
    @@ -50,12 +50,20 @@ class BlockManagerMaster(
         logInfo("Removal of executor " + execId + " requested")
       }
     
    -  /** Register the BlockManager's id with the driver. */
    +  /**
    +   * Register the BlockManager's id with the driver. The input BlockManagerId does not contain
    +   * topology information. This information is obtained from the master and we respond with an
    +   * updated BlockManagerId fleshed out with this information.
    +   */
       def registerBlockManager(
    -      blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {
    +    blockManagerId: BlockManagerId,
    --- End diff --
    
    indent 4 spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63636491
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    -    val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    +    val numPeersToReplicateTo = level.replication - 1
    +
    +    @tailrec def replicateBlock(
    +      numFailures: Int,
    +      peersForReplication: Seq[BlockManagerId],
    +      peersReplicatedTo: Set[BlockManagerId],
    +      peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = {
    +
    +      if (numFailures > maxReplicationFailures
    +                || peersForReplication.isEmpty
    +                || peersReplicatedTo.size == numPeersToReplicateTo) {
    +        // This selection of a peer and replication is continued in a loop until one of the
    +        // following 3 conditions is fulfilled:
    +        // (i) specified number of peers have been replicated to
    +        // (ii) too many failures in replicating to peers
    +        // (iii) no peer left to replicate to
    +        peersReplicatedTo
    +      } else {
    +        val peer = peersForReplication.head
    +        val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try {
    +          val onePeerStartTime = System.currentTimeMillis
    +          logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +          blockTransferService.uploadBlockSync(
    +            peer.host,
    +            peer.port,
    +            peer.executorId,
    +            blockId,
    +            new NettyManagedBuffer(data.toNetty),
    +            tLevel,
    +            classTag)
    +          logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +            s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +          // the block was replicated, lets update state and move ahead
    +          (numFailures,
    +            peersForReplication.tail,
    +            peersReplicatedTo + peer,
    +            peersFailedToReplicateTo)
    +        } catch {
    +          case e: Exception =>
    --- End diff --
    
    I just wonder if it is safe to catch all the exceptions here..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    LGTM. Just style comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426446
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -298,7 +310,17 @@ class BlockManagerMasterEndpoint(
         ).map(_.flatten.toSeq)
       }
     
    -  private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {
    +  private def register(dummyId: BlockManagerId,
    +    maxMemSize: Long,
    --- End diff --
    
    Can you also add a method doc saying this returns the same id with topology information attached?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426531
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---
    @@ -69,24 +72,37 @@ class BlockManagerId private (
         out.writeUTF(executorId_)
         out.writeUTF(host_)
         out.writeInt(port_)
    +    out.writeBoolean(topologyInfo_.isDefined)
    +    // we only write topologyInfo if we have it
    +    topologyInfo.foreach(out.writeUTF(_))
       }
     
       override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
         executorId_ = in.readUTF()
         host_ = in.readUTF()
         port_ = in.readInt()
    +    val isTopologyInfoAvailable = in.readBoolean()
    +    topologyInfo_ = if (isTopologyInfoAvailable) {
    --- End diff --
    
    it might be more clear to do
    ```
    if (isTopologyInfoAvailable) {
      topologyInfo_ = Option(in.readUTF())
    }  else {
      topologyInfo_ = None
    }
    ```
    
    or
    
    ```
    topologyInfo_ = if (isTopologyInfoAvailable) Option(in.readUTF()) else None
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r72685380
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -159,8 +162,27 @@ private[spark] class BlockManager(
         blockTransferService.init(this)
         shuffleClient.init(appId)
     
    +    val topologyInfo = {
    +      val topologyStr = master.getTopologyInfo(blockTransferService.hostName)
    +      if (topologyStr == null || topologyStr.isEmpty) {
    +        None
    +      } else {
    +        Some(topologyStr)
    +      }
    +    }
    +
    +    blockReplicationPrioritizer = {
    +      val priorityClass = conf.get(
    +        "spark.replication.topologyawareness.prioritizer",
    +        "org.apache.spark.storage.DefaultBlockReplicationPrioritization")
    +      val clazz = Utils.classForName(priorityClass)
    +      val ret = clazz.newInstance.asInstanceOf[BlockReplicationPrioritization]
    +      logInfo(s"Using $priorityClass for prioritizing peers")
    +      ret
    +    }
    +
         blockManagerId = BlockManagerId(
    -      executorId, blockTransferService.hostName, blockTransferService.port)
    +      executorId, blockTransferService.hostName, blockTransferService.port, topologyInfo)
    --- End diff --
    
    Would it work if topologyInfo was sent back from the master when `registerBlockManager` is called? It doesn't seem that anything uses `blockManagerId` until registration finishes. That way we wouldn't need this weird two-step registration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    > The topology info is only queried when the executor initiates and is assumed to stay the same throughout the life of the executor. Depending on the cluster manager being used, I am assuming the exact way this information is provided may differ. Resolving this at the master makes this implementation simpler as only the master needs to be able to access the service/script/class being used to resolve the topology. The communication overhead is minimal as the executors do have to communicate with the master when they initiate anyways.
    
    I see, that makes sense, though it is a little weird to ask the master for info that you use to register right away later.
    
    > The getRandomPeer() method was doing quite a bit more than just getting a random peer. It was being used to manage/mutate state, which was being mutated in other places as well. I tried to keep the block placement strategy and the usage of its output separate, to make it simpler to provide a new block placement strategy. I also thought it would be best to de-couple any internal replication state management with the block replication strategy, while still keeping the structure of the state the same.
    
    Still, I think it would be a smaller change to just move some of that logic out of getRandomPeer(), and retain the rest. Then you just need to implement getNextPeer(), and BlockManager doesn't need to worry about tracking the prioritized order internally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170745
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---
    @@ -55,10 +55,22 @@ class BlockManagerMasterEndpoint(
       private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
       private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
     
    +  private val topologyMapper = {
    +    val topologyMapperClassName = conf.get(
    +      "spark.replication.topologyawareness.topologyMapper",
    --- End diff --
    
    ```
    spark.storage.replication.topologyMapper
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170655
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -160,8 +164,25 @@ private[spark] class BlockManager(
         blockTransferService.init(this)
         shuffleClient.init(appId)
     
    -    blockManagerId = BlockManagerId(
    -      executorId, blockTransferService.hostName, blockTransferService.port)
    +    blockReplicationPolicy = {
    +      val priorityClass = conf.get(
    +        "spark.replication.topologyawareness.prioritizer",
    +        "org.apache.spark.storage.RandomBlockReplicationPolicy")
    --- End diff --
    
    classOf[RandomBlockReplicationPolicy].getName


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3225 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3225/consoleFull)** for PR 13152 at commit [`9b8ce32`](https://github.com/apache/spark/commit/9b8ce3229d0cff64e77d55563cec3cc3cda29182).
     * This patch **fails Spark unit tests**.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615142
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala ---
    @@ -69,24 +72,39 @@ class BlockManagerId private (
         out.writeUTF(executorId_)
         out.writeUTF(host_)
         out.writeInt(port_)
    +    out.writeBoolean(topologyInfo_.isDefined)
    +    // if we don't keep topology information, we just write an empty string.
    +    out.writeUTF(topologyInfo_.getOrElse(""))
    --- End diff --
    
    You can just omit this write if it's missing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161774
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.io.{File, FileInputStream}
    +import java.util.Properties
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   */
    +  def getTopologyForHost(hostname: String): String
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): String = {
    +    logDebug(s"Got a request for $hostname")
    +    "DefaultRack"
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyfile`. To use this topology mapper, set the
    + * `spark.replication.topologyawareness.topologyMapper` property to
    + * [[org.apache.spark.storage.FileBasedTopologyMapper]]
    + * @param conf SparkConf object
    + */
    +@DeveloperApi
    +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +
    +  val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile")
    +  require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.")
    +  val topologyMap = Utils.getPropertiesFromFile(topologyFile.get)
    +
    +  override def getTopologyForHost(hostname: String): String = {
    +    val topology = topologyMap.get(hostname)
    +    if(topology.isDefined) {
    --- End diff --
    
    space after the if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    **[Test build #3291 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3291/consoleFull)** for PR 13152 at commit [`632d043`](https://github.com/apache/spark/commit/632d0436ca701031e03bd141b95d4b0bb5544150).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426781
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures &&
    +      !peersForReplication.isEmpty &&
    +      peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    +        logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +        blockTransferService.uploadBlockSync(
    +          peer.host,
    +          peer.port,
    +          peer.executorId,
    +          blockId,
    +          new NettyManagedBuffer(data.toNetty),
    +          tLevel,
    +          classTag)
    +        logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +          s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +        peersForReplication = peersForReplication.tail
    +        peersReplicatedTo += peer
    +      } catch {
    +        case e: Exception =>
    --- End diff --
    
    catch NonFatal here rather than general Exception?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161782
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import java.io.{File, FileInputStream}
    +import java.util.Properties
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.SparkConf
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   */
    +  def getTopologyForHost(hostname: String): String
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): String = {
    +    logDebug(s"Got a request for $hostname")
    +    "DefaultRack"
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyfile`. To use this topology mapper, set the
    + * `spark.replication.topologyawareness.topologyMapper` property to
    + * [[org.apache.spark.storage.FileBasedTopologyMapper]]
    + * @param conf SparkConf object
    + */
    +@DeveloperApi
    +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +
    +  val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile")
    +  require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.")
    --- End diff --
    
    newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426714
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures &&
    +      !peersForReplication.isEmpty &&
    --- End diff --
    
    might be good to indent 2 more spaces here to so it doesn't look like the body of the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r75426753
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1088,109 +1108,86 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    +
    +    val numPeersToReplicateTo = level.replication - 1
    +
         val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    -            }
    -          } catch {
    -            case e: Exception =>
    -              logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
    -              failures += 1
    -              replicationFailed = true
    -              peersFailedToReplicateTo += peer
    -              if (failures > maxReplicationFailures) { // too many failures in replicating to peers
    -                done = true
    -              }
    +    var peersForReplication =
    +      blockReplicationPrioritizer.prioritize(blockManagerId, getPeers(false), Set.empty, blockId)
    +    var peersReplicatedTo = Set.empty[BlockManagerId]
    +    var peersFailedToReplicateTo = Set.empty[BlockManagerId]
    +    var numFailures = 0
    +
    +    while(numFailures <= maxReplicationFailures &&
    +      !peersForReplication.isEmpty &&
    +      peersReplicatedTo.size != numPeersToReplicateTo) {
    +      val peer = peersForReplication.head
    +      try {
    +        val onePeerStartTime = System.currentTimeMillis
    --- End diff --
    
    i understand you didn't introduce this problem, but do you mind changing this to use nanoTime while you are at it? 
    
    For more information see https://github.com/databricks/scala-style-guide#misc_currentTimeMillis_vs_nanoTime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r73615208
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala ---
    @@ -50,12 +50,20 @@ class BlockManagerMaster(
         logInfo("Removal of executor " + execId + " requested")
       }
     
    -  /** Register the BlockManager's id with the driver. */
    +  /** Register the BlockManager's id with the driver. The input BlockManagerId does not contain
    --- End diff --
    
    use javadoc style here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r76170605
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * ::DeveloperApi::
    + * TopologyMapper provides topology information for a given host
    + * @param conf SparkConf to get required properties, if needed
    + */
    +@DeveloperApi
    +abstract class TopologyMapper(conf: SparkConf) {
    +  /**
    +   * Gets the topology information given the host name
    +   *
    +   * @param hostname Hostname
    +   * @return topology information for the given hostname. One can use a 'topology delimiter'
    +   *         to make this topology information nested.
    +   *         For example : \u2018/myrack/myhost\u2019, where \u2018/\u2019 is the topology delimiter,
    +   *         \u2018myrack\u2019 is the topology identifier, and \u2018myhost\u2019 is the individual host.
    +   *         This function only returns the topology information without the hostname.
    +   *         This information can be used when choosing executors for block replication
    +   *         to discern executors from a different rack than a candidate executor, for example.
    +   *
    +   *         An implementation can choose to use empty strings or None in case topology info
    +   *         is not available. This would imply that all such executors belong to the same rack.
    +   */
    +  def getTopologyForHost(hostname: String): Option[String]
    +}
    +
    +@DeveloperApi
    +class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  override def getTopologyForHost(hostname: String): Option[String] = {
    +    logDebug(s"Got a request for $hostname")
    +    Some("DefaultRack")
    +  }
    +}
    +
    +/**
    + * A simple file based topology mapper. This expects topology information provided as a
    + * [[java.util.Properties]] file. The name of the file is obtained from SparkConf property
    + * `spark.replication.topologyawareness.topologyFile`. To use this topology mapper, set the
    + * `spark.replication.topologyawareness.topologyMapper` property to
    + * [[org.apache.spark.storage.FileBasedTopologyMapper]]
    + * @param conf SparkConf object
    + */
    +@DeveloperApi
    +class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging {
    +  val topologyFile = conf.getOption("spark.replication.topologyawareness.topologyfile")
    +  require(topologyFile.isDefined, "Please provide topology file for FileBasedTopologyMapper.")
    --- End diff --
    
    "Please specify topology file via spark.storage.replication.topologyFile for FileBasedTopologyMapper."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r74161515
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPrioritization.scala ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.storage
    +
    +import scala.util.Random
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.internal.Logging
    +
    +/**
    + * ::DeveloperApi::
    + * BlockReplicationPrioritization provides logic for prioritizing a sequence of peers for
    + * replicating blocks
    --- End diff --
    
    add: BlockManager will replicate to each peer returned in order until the desired replication order is reached. If a replication fails, prioritize() will be called again to get a fresh prioritization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #13152: [SPARK-15353] [CORE] Making peer selection for bl...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/13152


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #13152: [SPARK-15353] [CORE] Making peer selection for block rep...

Posted by ericl <gi...@git.apache.org>.
Github user ericl commented on the issue:

    https://github.com/apache/spark/pull/13152
  
    A couple high level questions:
    - Rather than send an RPC to the master asking for a worker's topology info, is it possible for this to be provided at initialization time or determined based on the environment?
    
    - Is it possible to narrow the interface of the prioritizer to just choose a single next peer? If it is desired to cache the prioritization order, this can be done internally within the prioritizer. For example, the interface could be something like this. Then the default prioritizer does not need to do a random shuffle of the entire peer list to choose its target.
    
    ```
    trait BlockReplicationStrategy {
    
      trait ReplicationTargetSelector {
        def getNextPeer(
          candidatePeers: Set[BlockManagerId],
          successfulReplications: Set[BlockManagerId],
          failedReplications: Set[BlockManagerId]): Option[BlockManagerId]
      }
    
      def getTargetSelector(
        localId: BlockManagerId,
        blockId: BlockId,
        level: StorageLevel): ReplicationTargetSelector
    }
    ```
    
    Also, the patch would be more minimal if only the `getRandomPeer()` call was changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-15353] [CORE] Making peer selection for...

Posted by shubhamchopra <gi...@git.apache.org>.
Github user shubhamchopra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13152#discussion_r63780828
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1079,109 +1103,97 @@ private[spark] class BlockManager(
       }
     
       /**
    -   * Replicate block to another node. Not that this is a blocking call that returns after
    +   * Replicate block to another node. Note that this is a blocking call that returns after
        * the block has been replicated.
    +   *
    +   * @param blockId
    +   * @param data
    +   * @param level
    +   * @param classTag
        */
       private def replicate(
    -      blockId: BlockId,
    -      data: ChunkedByteBuffer,
    -      level: StorageLevel,
    -      classTag: ClassTag[_]): Unit = {
    +    blockId: BlockId,
    +    data: ChunkedByteBuffer,
    +    level: StorageLevel,
    +    classTag: ClassTag[_]): Unit = {
    +
         val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
    -    val numPeersToReplicateTo = level.replication - 1
    -    val peersForReplication = new ArrayBuffer[BlockManagerId]
    -    val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
    -    val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
         val tLevel = StorageLevel(
           useDisk = level.useDisk,
           useMemory = level.useMemory,
           useOffHeap = level.useOffHeap,
           deserialized = level.deserialized,
           replication = 1)
    -    val startTime = System.currentTimeMillis
    -    val random = new Random(blockId.hashCode)
    -
    -    var replicationFailed = false
    -    var failures = 0
    -    var done = false
    -
    -    // Get cached list of peers
    -    peersForReplication ++= getPeers(forceFetch = false)
    -
    -    // Get a random peer. Note that this selection of a peer is deterministic on the block id.
    -    // So assuming the list of peers does not change and no replication failures,
    -    // if there are multiple attempts in the same node to replicate the same block,
    -    // the same set of peers will be selected.
    -    def getRandomPeer(): Option[BlockManagerId] = {
    -      // If replication had failed, then force update the cached list of peers and remove the peers
    -      // that have been already used
    -      if (replicationFailed) {
    -        peersForReplication.clear()
    -        peersForReplication ++= getPeers(forceFetch = true)
    -        peersForReplication --= peersReplicatedTo
    -        peersForReplication --= peersFailedToReplicateTo
    -      }
    -      if (!peersForReplication.isEmpty) {
    -        Some(peersForReplication(random.nextInt(peersForReplication.size)))
    -      } else {
    -        None
    -      }
    -    }
     
    -    // One by one choose a random peer and try uploading the block to it
    -    // If replication fails (e.g., target peer is down), force the list of cached peers
    -    // to be re-fetched from driver and then pick another random peer for replication. Also
    -    // temporarily black list the peer for which replication failed.
    -    //
    -    // This selection of a peer and replication is continued in a loop until one of the
    -    // following 3 conditions is fulfilled:
    -    // (i) specified number of peers have been replicated to
    -    // (ii) too many failures in replicating to peers
    -    // (iii) no peer left to replicate to
    -    //
    -    while (!done) {
    -      getRandomPeer() match {
    -        case Some(peer) =>
    -          try {
    -            val onePeerStartTime = System.currentTimeMillis
    -            logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    -            blockTransferService.uploadBlockSync(
    -              peer.host,
    -              peer.port,
    -              peer.executorId,
    -              blockId,
    -              new NettyManagedBuffer(data.toNetty),
    -              tLevel,
    -              classTag)
    -            logTrace(s"Replicated $blockId of ${data.size} bytes to $peer in %s ms"
    -              .format(System.currentTimeMillis - onePeerStartTime))
    -            peersReplicatedTo += peer
    -            peersForReplication -= peer
    -            replicationFailed = false
    -            if (peersReplicatedTo.size == numPeersToReplicateTo) {
    -              done = true  // specified number of peers have been replicated to
    +    val numPeersToReplicateTo = level.replication - 1
    +
    +    @tailrec def replicateBlock(
    +      numFailures: Int,
    +      peersForReplication: Seq[BlockManagerId],
    +      peersReplicatedTo: Set[BlockManagerId],
    +      peersFailedToReplicateTo: Set[BlockManagerId]): Set[BlockManagerId] = {
    +
    +      if (numFailures > maxReplicationFailures
    +                || peersForReplication.isEmpty
    +                || peersReplicatedTo.size == numPeersToReplicateTo) {
    +        // This selection of a peer and replication is continued in a loop until one of the
    +        // following 3 conditions is fulfilled:
    +        // (i) specified number of peers have been replicated to
    +        // (ii) too many failures in replicating to peers
    +        // (iii) no peer left to replicate to
    +        peersReplicatedTo
    +      } else {
    +        val peer = peersForReplication.head
    +        val (updatedNumFailures, updatedPeers, updatedReplicatedPeers, updatedFailedPeers) = try {
    +          val onePeerStartTime = System.currentTimeMillis
    +          logTrace(s"Trying to replicate $blockId of ${data.size} bytes to $peer")
    +          blockTransferService.uploadBlockSync(
    +            peer.host,
    +            peer.port,
    +            peer.executorId,
    +            blockId,
    +            new NettyManagedBuffer(data.toNetty),
    +            tLevel,
    +            classTag)
    +          logTrace(s"Replicated $blockId of ${data.size} bytes to $peer" +
    +            s" in ${System.currentTimeMillis - onePeerStartTime} ms")
    +          // the block was replicated, lets update state and move ahead
    +          (numFailures,
    +            peersForReplication.tail,
    +            peersReplicatedTo + peer,
    +            peersFailedToReplicateTo)
    +        } catch {
    +          case e: Exception =>
    --- End diff --
    
    The intended behavior here is to keep trying with other available options, if there is any exception here, till we run out of options or have a high number of failures. The method does a "best-effort" replication in that sense. It was the same behavior in the earlier implementation as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org