You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mateiz <gi...@git.apache.org> on 2015/09/21 00:28:20 UTC

[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

GitHub user mateiz opened a pull request:

    https://github.com/apache/spark/pull/8844

    [SPARK-9852] Let reduce tasks fetch multiple map output partitions

    This makes two changes:
    
    - Allow reduce tasks to fetch multiple map output partitions -- this is a pretty small change to HashShuffleFetcher
    - Move shuffle locality computation out of DAGScheduler and into ShuffledRDD / MapOutputTracker; this was needed because the code in DAGScheduler wouldn't work for RDDs that fetch multiple map output partitions from each reduce task
    
    I also added an AdaptiveSchedulingSuite that creates RDDs depending on multiple map output partitions.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mateiz/spark spark-9852

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8844.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8844
    
----
commit cbf6a5a78c419b8bb26e6259849f1c35a2d31edb
Author: Matei Zaharia <ma...@databricks.com>
Date:   2015-08-13T23:35:49Z

    Allow HashShuffleReader to fetch multiple partitions

commit 8f42d5c036b9b0985c89cdcf43f66f9f5eec6f3f
Author: Matei Zaharia <ma...@databricks.com>
Date:   2015-08-20T22:13:54Z

    Compute reduce locality only for ShuffledRDD and its SQL counterpart

commit e4a6f5f547788d03c1e0a373fc2f091571c1d12b
Author: Matei Zaharia <ma...@databricks.com>
Date:   2015-08-20T22:23:05Z

    More testing

commit 9ab02f17bae711dbd2e3979f8e64863fb84cbd81
Author: Matei Zaharia <ma...@databricks.com>
Date:   2015-09-20T16:09:07Z

    Fix compile

commit f4d2519bc3467d595dcd293a2b54157664222630
Author: Matei Zaharia <ma...@databricks.com>
Date:   2015-09-20T22:24:34Z

    Some test fixes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141841910
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141852575
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42739/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/8844


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141852539
  
      [Test build #42739 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42739/console) for   PR 8844 at commit [`f4d2519`](https://github.com/apache/spark/commit/f4d2519bc3467d595dcd293a2b54157664222630).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142703433
  
    Alright, I made the suggested changes. I don't think we need to make those classes `private[spark]` because they are in `src/test`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142831032
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by shivaram <gi...@git.apache.org>.
Github user shivaram commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40043438
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       }
     
       /**
    +   * Return the preferred hosts on which to run the given map output partition in a given shuffle,
    +   * i.e. the nodes that the most outputs for that partition are on.
    +   *
    +   * @param dep shuffle dependency object
    +   * @param partitionId map output partition that we want to read
    +   * @return a sequence of host names
    +   */
    +  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
    +      : Seq[String] = {
    +    if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
    +        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
    +      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
    +        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
    +      if (blockManagerIds.nonEmpty) {
    +        blockManagerIds.get.map(_.host)
    --- End diff --
    
    I don't think its very important given that our shuffle implementations write out data to local disk --  It might make a difference if we stored shuffle data in the block manager (deserialized) but given the status quo I think executor-level locality doesn't really matter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142746148
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42917/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142704410
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142746043
  
      [Test build #42917 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42917/console) for   PR 8844 at commit [`9eac9d6`](https://github.com/apache/spark/commit/9eac9d651d52bd0b5fbe797f7d9b6da4f7c9b6a4).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40170084
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.Arrays
    +
    +import org.apache.spark._
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * A Partitioner that might group together one or more partitions from the parent.
    + *
    + * @param parent a parent partitioner
    + * @param partitionStartIndices indices of partitions in parent that should create new partitions
    + *   in child (this should be an array of increasing partition IDs). For example, if we have a
    + *   parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output
    + *   partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner.
    + */
    +class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int])
    --- End diff --
    
    This is a test class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40027092
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.Arrays
    +
    +import org.apache.spark._
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * A Partitioner that might group together one or more partitions from the parent.
    + *
    + * @param parent a parent partitioner
    + * @param partitionStartIndices indices of partitions in parent that should create new partitions
    + *   in child (this should be an array of increasing partition IDs). For example, if we have a
    + *   parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output
    + *   partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner.
    + */
    +class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int])
    --- End diff --
    
    `private[spark]`? Or should this be `@DeveloperAPI`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142704459
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141852573
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40025584
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -134,11 +134,24 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
        */
       def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
           : Seq[(BlockManagerId, Seq[(BlockId, Long)])] = {
    -    logDebug(s"Fetching outputs for shuffle $shuffleId, reduce $reduceId")
    +    getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
    +  }
    +
    +  /**
    +   * Called from executors to get the server URIs and output sizes for each shuffle block that
    +   * needs to be read from a given range of map output partitions.
    +   *
    +   * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId,
    +   *         and the second item is a sequence of (shuffle block id, shuffle block size) tuples
    +   *         describing the shuffle blocks that are stored at that block manager.
    +   */
    +  def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
    --- End diff --
    
    The Scaladoc here should probably explain whether the range includes the `endPartition` endpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r39988247
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       }
     
       /**
    +   * Return the preferred hosts on which to run the given map output partition in a given shuffle,
    +   * i.e. the nodes that the most outputs for that partition are on.
    +   *
    +   * @param dep shuffle dependency object
    +   * @param partitionId map output partition that we want to read
    +   * @return a sequence of host names
    +   */
    +  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
    +      : Seq[String] = {
    +    if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
    +        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
    +      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
    +        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
    +      if (blockManagerIds.nonEmpty) {
    +        blockManagerIds.get.map(_.host)
    --- End diff --
    
    The previous implementation can use the executor id to schedule tasks. It could be more effective when there are multiple executors in the same host. Is it possible to support it in the new implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40027112
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/CustomShuffledRDD.scala ---
    @@ -0,0 +1,111 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler
    +
    +import java.util.Arrays
    +
    +import org.apache.spark._
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * A Partitioner that might group together one or more partitions from the parent.
    + *
    + * @param parent a parent partitioner
    + * @param partitionStartIndices indices of partitions in parent that should create new partitions
    + *   in child (this should be an array of increasing partition IDs). For example, if we have a
    + *   parent with 5 partitions, and partitionStartIndices is [0, 2, 4], we get three output
    + *   partitions, corresponding to partition ranges [0, 1], [2, 3] and [4] of the parent partitioner.
    + */
    +class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: Array[Int])
    +  extends Partitioner {
    +
    +  @transient private lazy val parentPartitionMapping: Array[Int] = {
    +    val n = parent.numPartitions
    +    val result = new Array[Int](n)
    +    for (i <- 0 until partitionStartIndices.length) {
    +      val start = partitionStartIndices(i)
    +      val end = if (i < partitionStartIndices.length - 1) partitionStartIndices(i + 1) else n
    +      for (j <- start until end) {
    +        result(j) = i
    +      }
    +    }
    +    result
    +  }
    +
    +  override def numPartitions: Int = partitionStartIndices.size
    +
    +  override def getPartition(key: Any): Int = {
    +    parentPartitionMapping(parent.getPartition(key))
    +  }
    +
    +  override def equals(other: Any): Boolean = other match {
    +    case c: CoalescedPartitioner =>
    +      c.parent == parent && Arrays.equals(c.partitionStartIndices, partitionStartIndices)
    +    case _ =>
    +      false
    +  }
    +}
    +
    +private[spark] class CustomShuffledRDDPartition(
    +    val index: Int, val startIndexInParent: Int, val endIndexInParent: Int)
    +  extends Partition {
    +
    +  override def hashCode(): Int = index
    +}
    +
    +/**
    + * A special ShuffledRDD that supports a ShuffleDependency object from outside and launching reduce
    + * tasks that read multiple map output partitions.
    + */
    +class CustomShuffledRDD[K, V, C](
    --- End diff --
    
    Same comment here: should this be `private[spark]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142705877
  
      [Test build #42917 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42917/consoleFull) for   PR 8844 at commit [`9eac9d6`](https://github.com/apache/spark/commit/9eac9d651d52bd0b5fbe797f7d9b6da4f7c9b6a4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r39935722
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -474,9 +495,9 @@ class DAGSchedulerSuite
     
       test("run trivial shuffle with fetch failure") {
         val shuffleMapRdd = new MyRDD(sc, 2, Nil)
    -    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    +    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
    --- End diff --
    
    These changes were needed because the new MapOutputTracker.getPreferredLocationsForShuffle figures out the number of partitions from the shuffle dependency's partitioner, so we could no longer pass in null as the partitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141841907
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141841616
  
    @shivaram, @JoshRosen, @zsxwing this may be relevant to you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8844#discussion_r40026880
  
    --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
    @@ -323,6 +351,30 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
       }
     
       /**
    +   * Return the preferred hosts on which to run the given map output partition in a given shuffle,
    +   * i.e. the nodes that the most outputs for that partition are on.
    +   *
    +   * @param dep shuffle dependency object
    +   * @param partitionId map output partition that we want to read
    +   * @return a sequence of host names
    +   */
    +  def getPreferredLocationsForShuffle(dep: ShuffleDependency[_, _, _], partitionId: Int)
    +      : Seq[String] = {
    +    if (shuffleLocalityEnabled && dep.rdd.partitions.length < SHUFFLE_PREF_MAP_THRESHOLD &&
    +        dep.partitioner.numPartitions < SHUFFLE_PREF_REDUCE_THRESHOLD) {
    +      val blockManagerIds = getLocationsWithLargestOutputs(dep.shuffleId, partitionId,
    +        dep.partitioner.numPartitions, REDUCER_PREF_LOCS_FRACTION)
    +      if (blockManagerIds.nonEmpty) {
    +        blockManagerIds.get.map(_.host)
    --- End diff --
    
    Yeah, that's a good point, we lose that because of the design of RDD.getPreferredLocations. I'd actually consider extending that API to allow passing executor IDs, but probably in a separate JIRA. @shivaram how important do you think this is?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142802670
  
    Alright, let me know if you guys have any other comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142746146
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-143113277
  
    Alright, merged this, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-142703664
  
    Yep, sorry for overlooking that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-9852] Let reduce tasks fetch multiple m...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8844#issuecomment-141842178
  
      [Test build #42739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42739/consoleFull) for   PR 8844 at commit [`f4d2519`](https://github.com/apache/spark/commit/f4d2519bc3467d595dcd293a2b54157664222630).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org