You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mateiz <gi...@git.apache.org> on 2014/08/06 05:28:30 UTC

[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

GitHub user mateiz opened a pull request:

    https://github.com/apache/spark/pull/1799

    SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small

    As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.
    
    On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mateiz/spark SPARK-2787

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1799.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1799
    
----
commit a42a102f0f05b01f129947c3ead2cd0674f7ea2e
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-05T02:25:49Z

    Move existing logic for writing partitioned files into ExternalSorter
    
    Also renamed ExternalSorter.write(Iterator) to insertAll, to match
    ExternalAppendOnlyMap

commit 82b187a56c1e115f5e4c7d5beed8d3deb6819a77
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-06T03:10:22Z

    Add code path to bypass merge-sort in ExternalSorter, and tests

commit f401c78638a85a87a06d4bf6d880bf9f7b9c1f4a
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-06T03:12:06Z

    Fix some comments

commit 2afb4122021b3c7655a1e39ab9e11499f5cb3e18
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-08-06T03:27:17Z

    Add docs for shuffle manager properties, and allow short names for them

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51552406
  
    I'm merging this in master & branch-1.1 (since sort-based is disabled by default)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51300244
  
    QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51377820
  
    QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51384723
  
    QA results for PR 1799:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51552301
  
    @rxin does this look okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15898826
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
        */
       def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
     
    +  /**
    +   * Write all the data added into this ExternalSorter into a file in the disk store, creating
    +   * an .index file for it as well with the offsets of each partition. This is called by the
    +   * SortShuffleWriter and can go through an efficient path of just concatenating binary files
    +   * if we decided to avoid merge-sorting.
    +   *
    +   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
    +   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
    +   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
    +   */
    +  def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
    +    val outputFile = blockManager.diskBlockManager.getFile(blockId)
    +
    +    // Track location of each range in the output file
    +    val offsets = new Array[Long](numPartitions + 1)
    +    val lengths = new Array[Long](numPartitions)
    +
    +    // Statistics
    +    var totalBytes = 0L
    +    var totalTime = 0L
    +
    +    if (bypassMergeSort && partitionWriters != null) {
    +      // We decided to write separate files for each partition, so just concatenate them. To keep
    +      // this simple we spill out the current in-memory collection so that everything is in files.
    +      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
    +      partitionWriters.foreach(_.commitAndClose())
    +      var out: FileOutputStream = null
    +      var in: FileInputStream = null
    +      try {
    +        out = new FileOutputStream(outputFile)
    +        for (i <- 0 until numPartitions) {
    +          val file = partitionWriters(i).fileSegment().file
    --- End diff --
    
    I find this part that uses fileSegments slightly convoluted. But we can deal with this later. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51299104
  
    QA results for PR 1799:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15897877
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
           "."
         }
     
    -    val shuffleManager = instantiateClass[ShuffleManager](
    -      "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
    +    // Let the user specify short names for shuffle managers
    +    val shortShuffleMgrNames = Map(
    +      "HASH" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
    +      "SORT" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    +    val shuffleMgrName = conf.get("spark.shuffle.manager", "HASH")
    --- End diff --
    
    can we make this case insensitive?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15898462
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
        */
       def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
     
    +  /**
    +   * Write all the data added into this ExternalSorter into a file in the disk store, creating
    +   * an .index file for it as well with the offsets of each partition. This is called by the
    +   * SortShuffleWriter and can go through an efficient path of just concatenating binary files
    +   * if we decided to avoid merge-sorting.
    +   *
    +   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
    +   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
    +   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
    +   */
    +  def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
    +    val outputFile = blockManager.diskBlockManager.getFile(blockId)
    +
    +    // Track location of each range in the output file
    +    val offsets = new Array[Long](numPartitions + 1)
    +    val lengths = new Array[Long](numPartitions)
    +
    +    // Statistics
    +    var totalBytes = 0L
    +    var totalTime = 0L
    +
    +    if (bypassMergeSort && partitionWriters != null) {
    --- End diff --
    
    In the comment above partitionWriters, we say: "Array of file writers for each partition, used if bypassMergeSort is true" .
    
    This implies that when bypassMergeSort is true, partitionWriters wouldn't be null. Can you document the case when partitionWriters would be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51425340
  
    QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51380951
  
    Ah cool, glad it's being fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51552456
  
    Alright, thanks. Going to merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15898020
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
    @@ -54,87 +55,36 @@ private[spark] class SortShuffleWriter[K, V, C](
     
       /** Write a bunch of records to this task's output */
       override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    -    // Get an iterator with the elements for each partition ID
    -    val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
    -      if (dep.mapSideCombine) {
    -        if (!dep.aggregator.isDefined) {
    -          throw new IllegalStateException("Aggregator is empty for map-side combine")
    -        }
    -        sorter = new ExternalSorter[K, V, C](
    -          dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    -        sorter.write(records)
    -        sorter.partitionedIterator
    -      } else {
    -        // In this case we pass neither an aggregator nor an ordering to the sorter, because we
    -        // don't care whether the keys get sorted in each partition; that will be done on the
    -        // reduce side if the operation being run is sortByKey.
    -        sorter = new ExternalSorter[K, V, V](
    -          None, Some(dep.partitioner), None, dep.serializer)
    -        sorter.write(records)
    -        sorter.partitionedIterator
    +    if (dep.mapSideCombine) {
    +      if (!dep.aggregator.isDefined) {
    +        throw new IllegalStateException("Aggregator is empty for map-side combine")
           }
    +      sorter = new ExternalSorter[K, V, C](
    +        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
    +      sorter.insertAll(records)
    +    } else {
    +      // In this case we pass neither an aggregator nor an ordering to the sorter, because we
    +      // don't care whether the keys get sorted in each partition; that will be done on the
    +      // reduce side if the operation being run is sortByKey.
    +      sorter = new ExternalSorter[K, V, V](
    --- End diff --
    
    I think this fits in one line without wrapping ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r16029607
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
           "."
         }
     
    -    val shuffleManager = instantiateClass[ShuffleManager](
    -      "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
    +    // Let the user specify short names for shuffle managers
    +    val shortShuffleMgrNames = Map(
    +      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
    +      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    +    val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
    --- End diff --
    
    I ran into a problem using these short names: in ShuffleBlockManager, there's a line that looks at the `spark.shuffle.manager` property to see whether we're using sort-based shuffle:
    
    ```scala
      // Are we using sort-based shuffle?
      val sortBasedShuffle =
        conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
    ```
    
    This won't work properly if the configuration property is set to one of the short names.
    
    We can't just re-assign the property to the full name because the BlockManager will have already been created by this point and it will have created the ShuffleBlockManager with the wrong property value.  Similarly, the ShuffleBlockManager can't access SparkEnv to inspect the actual ShuffleManager because it won't be fully initialized.
    
    I think we should perform all configuration normalization / mutation at a single top-level location and then treat the configuration as immutable from that point forward, since that seems easier to reason about.  What do you think about moving the aliasing  / normalization to the top of SparkEnv?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51425061
  
    Thanks; updated to deal with comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51295773
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51303731
  
    QA results for PR 1799:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51428187
  
    QA results for PR 1799:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51378648
  
    Yeah the flaky tests are fixed here #1803


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51291087
  
    QA results for PR 1799:<br>- This patch FAILED unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51552394
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15915999
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
        */
       def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
     
    +  /**
    +   * Write all the data added into this ExternalSorter into a file in the disk store, creating
    +   * an .index file for it as well with the offsets of each partition. This is called by the
    +   * SortShuffleWriter and can go through an efficient path of just concatenating binary files
    +   * if we decided to avoid merge-sorting.
    +   *
    +   * @param blockId block ID to write to. The index file will be blockId.name + ".index".
    +   * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
    +   * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
    +   */
    +  def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
    +    val outputFile = blockManager.diskBlockManager.getFile(blockId)
    +
    +    // Track location of each range in the output file
    +    val offsets = new Array[Long](numPartitions + 1)
    +    val lengths = new Array[Long](numPartitions)
    +
    +    // Statistics
    +    var totalBytes = 0L
    +    var totalTime = 0L
    +
    +    if (bypassMergeSort && partitionWriters != null) {
    +      // We decided to write separate files for each partition, so just concatenate them. To keep
    +      // this simple we spill out the current in-memory collection so that everything is in files.
    +      spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
    +      partitionWriters.foreach(_.commitAndClose())
    +      var out: FileOutputStream = null
    +      var in: FileInputStream = null
    +      try {
    +        out = new FileOutputStream(outputFile)
    +        for (i <- 0 until numPartitions) {
    +          val file = partitionWriters(i).fileSegment().file
    --- End diff --
    
    Yeah unfortunately that's just how you get the file


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1799


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r15898996
  
    --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
    @@ -120,6 +128,18 @@ private[spark] class ExternalSorter[K, V, C](
       // How much of the shared memory pool this collection has claimed
       private var myMemoryThreshold = 0L
     
    +  // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
    +  // local aggregation and sorting, write numPartitions files directly and just concatenate them
    +  // at the end. This avoids doing serialization and deserialization twice to merge together the
    +  // spilled files, which would happen with the normal code path. The downside is having multiple
    +  // files open at a time and thus more memory allocated to buffers.
    +  private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    +  private[collection] val bypassMergeSort =            // private[collection] for unit tests
    --- End diff --
    
    You can use this in unit test: http://doc.scalatest.org/1.4.1/org/scalatest/PrivateMethodTester.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51296001
  
    QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51300053
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51377222
  
    @rxin / @andrewor14 would be good if you review this when you have a chance. This is something we should add in 1.1 since sort-based shuffle is still off by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51295530
  
    QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17999/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1799#discussion_r16033699
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
    @@ -246,8 +250,13 @@ object SparkEnv extends Logging {
           "."
         }
     
    -    val shuffleManager = instantiateClass[ShuffleManager](
    -      "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
    +    // Let the user specify short names for shuffle managers
    +    val shortShuffleMgrNames = Map(
    +      "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
    +      "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    +    val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
    --- End diff --
    
    I'd rather not change the configuration under the user, that would be confusing if they later print it or look in the web UI. Instead, maybe add a SparkEnv.getShuffleManagerClass(conf: SparkConf) that can return the real class name.
    
    Also I'd be fine initializing the ShuffleBlockManager after the ShuffleManager if that works, and using isInstanceOf. That would be the cleanest.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51377274
  
    BTW the test failures both time were in a Flume test for streaming, which might just be flaky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51376950
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1799#issuecomment-51290081
  
    QA tests have started for PR 1799. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org