You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mateiz <gi...@git.apache.org> on 2014/08/06 05:28:30 UTC
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
GitHub user mateiz opened a pull request:
https://github.com/apache/spark/pull/1799
SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small
As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file.
On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mateiz/spark SPARK-2787
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1799.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1799
----
commit a42a102f0f05b01f129947c3ead2cd0674f7ea2e
Author: Matei Zaharia <ma...@databricks.com>
Date: 2014-08-05T02:25:49Z
Move existing logic for writing partitioned files into ExternalSorter
Also renamed ExternalSorter.write(Iterator) to insertAll, to match
ExternalAppendOnlyMap
commit 82b187a56c1e115f5e4c7d5beed8d3deb6819a77
Author: Matei Zaharia <ma...@databricks.com>
Date: 2014-08-06T03:10:22Z
Add code path to bypass merge-sort in ExternalSorter, and tests
commit f401c78638a85a87a06d4bf6d880bf9f7b9c1f4a
Author: Matei Zaharia <ma...@databricks.com>
Date: 2014-08-06T03:12:06Z
Fix some comments
commit 2afb4122021b3c7655a1e39ab9e11499f5cb3e18
Author: Matei Zaharia <ma...@databricks.com>
Date: 2014-08-06T03:27:17Z
Add docs for shuffle manager properties, and allow short names for them
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51552406
I'm merging this in master & branch-1.1 (since sort-based is disabled by default)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51300244
QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51377820
QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51384723
QA results for PR 1799:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18036/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51552301
@rxin does this look okay?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15898826
--- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
*/
def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+ /**
+ * Write all the data added into this ExternalSorter into a file in the disk store, creating
+ * an .index file for it as well with the offsets of each partition. This is called by the
+ * SortShuffleWriter and can go through an efficient path of just concatenating binary files
+ * if we decided to avoid merge-sorting.
+ *
+ * @param blockId block ID to write to. The index file will be blockId.name + ".index".
+ * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
+ * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
+ */
+ def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
+ val outputFile = blockManager.diskBlockManager.getFile(blockId)
+
+ // Track location of each range in the output file
+ val offsets = new Array[Long](numPartitions + 1)
+ val lengths = new Array[Long](numPartitions)
+
+ // Statistics
+ var totalBytes = 0L
+ var totalTime = 0L
+
+ if (bypassMergeSort && partitionWriters != null) {
+ // We decided to write separate files for each partition, so just concatenate them. To keep
+ // this simple we spill out the current in-memory collection so that everything is in files.
+ spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
+ partitionWriters.foreach(_.commitAndClose())
+ var out: FileOutputStream = null
+ var in: FileInputStream = null
+ try {
+ out = new FileOutputStream(outputFile)
+ for (i <- 0 until numPartitions) {
+ val file = partitionWriters(i).fileSegment().file
--- End diff --
I find this part that uses fileSegments slightly convoluted. But we can deal with this later.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51299104
QA results for PR 1799:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15897877
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}
- val shuffleManager = instantiateClass[ShuffleManager](
- "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
+ // Let the user specify short names for shuffle managers
+ val shortShuffleMgrNames = Map(
+ "HASH" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
+ "SORT" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+ val shuffleMgrName = conf.get("spark.shuffle.manager", "HASH")
--- End diff --
can we make this case insensitive?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15898462
--- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
*/
def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+ /**
+ * Write all the data added into this ExternalSorter into a file in the disk store, creating
+ * an .index file for it as well with the offsets of each partition. This is called by the
+ * SortShuffleWriter and can go through an efficient path of just concatenating binary files
+ * if we decided to avoid merge-sorting.
+ *
+ * @param blockId block ID to write to. The index file will be blockId.name + ".index".
+ * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
+ * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
+ */
+ def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
+ val outputFile = blockManager.diskBlockManager.getFile(blockId)
+
+ // Track location of each range in the output file
+ val offsets = new Array[Long](numPartitions + 1)
+ val lengths = new Array[Long](numPartitions)
+
+ // Statistics
+ var totalBytes = 0L
+ var totalTime = 0L
+
+ if (bypassMergeSort && partitionWriters != null) {
--- End diff --
In the comment above partitionWriters, we say: "Array of file writers for each partition, used if bypassMergeSort is true" .
This implies that when bypassMergeSort is true, partitionWriters wouldn't be null. Can you document the case when partitionWriters would be null?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51425340
QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51380951
Ah cool, glad it's being fixed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51552456
Alright, thanks. Going to merge it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15898020
--- Diff: core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala ---
@@ -54,87 +55,36 @@ private[spark] class SortShuffleWriter[K, V, C](
/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
- // Get an iterator with the elements for each partition ID
- val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = {
- if (dep.mapSideCombine) {
- if (!dep.aggregator.isDefined) {
- throw new IllegalStateException("Aggregator is empty for map-side combine")
- }
- sorter = new ExternalSorter[K, V, C](
- dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
- sorter.write(records)
- sorter.partitionedIterator
- } else {
- // In this case we pass neither an aggregator nor an ordering to the sorter, because we
- // don't care whether the keys get sorted in each partition; that will be done on the
- // reduce side if the operation being run is sortByKey.
- sorter = new ExternalSorter[K, V, V](
- None, Some(dep.partitioner), None, dep.serializer)
- sorter.write(records)
- sorter.partitionedIterator
+ if (dep.mapSideCombine) {
+ if (!dep.aggregator.isDefined) {
+ throw new IllegalStateException("Aggregator is empty for map-side combine")
}
+ sorter = new ExternalSorter[K, V, C](
+ dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
+ sorter.insertAll(records)
+ } else {
+ // In this case we pass neither an aggregator nor an ordering to the sorter, because we
+ // don't care whether the keys get sorted in each partition; that will be done on the
+ // reduce side if the operation being run is sortByKey.
+ sorter = new ExternalSorter[K, V, V](
--- End diff --
I think this fits in one line without wrapping ...
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r16029607
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}
- val shuffleManager = instantiateClass[ShuffleManager](
- "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
+ // Let the user specify short names for shuffle managers
+ val shortShuffleMgrNames = Map(
+ "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
+ "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+ val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
--- End diff --
I ran into a problem using these short names: in ShuffleBlockManager, there's a line that looks at the `spark.shuffle.manager` property to see whether we're using sort-based shuffle:
```scala
// Are we using sort-based shuffle?
val sortBasedShuffle =
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName
```
This won't work properly if the configuration property is set to one of the short names.
We can't just re-assign the property to the full name because the BlockManager will have already been created by this point and it will have created the ShuffleBlockManager with the wrong property value. Similarly, the ShuffleBlockManager can't access SparkEnv to inspect the actual ShuffleManager because it won't be fully initialized.
I think we should perform all configuration normalization / mutation at a single top-level location and then treat the configuration as immutable from that point forward, since that seems easier to reason about. What do you think about moving the aliasing / normalization to the top of SparkEnv?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51425061
Thanks; updated to deal with comments.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51295773
test this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51303731
QA results for PR 1799:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18009/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51428187
QA results for PR 1799:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18096/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51378648
Yeah the flaky tests are fixed here #1803
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51291087
QA results for PR 1799:<br>- This patch FAILED unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51552394
LGTM
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15915999
--- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -640,9 +713,122 @@ private[spark] class ExternalSorter[K, V, C](
*/
def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2)
+ /**
+ * Write all the data added into this ExternalSorter into a file in the disk store, creating
+ * an .index file for it as well with the offsets of each partition. This is called by the
+ * SortShuffleWriter and can go through an efficient path of just concatenating binary files
+ * if we decided to avoid merge-sorting.
+ *
+ * @param blockId block ID to write to. The index file will be blockId.name + ".index".
+ * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
+ * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
+ */
+ def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = {
+ val outputFile = blockManager.diskBlockManager.getFile(blockId)
+
+ // Track location of each range in the output file
+ val offsets = new Array[Long](numPartitions + 1)
+ val lengths = new Array[Long](numPartitions)
+
+ // Statistics
+ var totalBytes = 0L
+ var totalTime = 0L
+
+ if (bypassMergeSort && partitionWriters != null) {
+ // We decided to write separate files for each partition, so just concatenate them. To keep
+ // this simple we spill out the current in-memory collection so that everything is in files.
+ spillToPartitionFiles(if (aggregator.isDefined) map else buffer)
+ partitionWriters.foreach(_.commitAndClose())
+ var out: FileOutputStream = null
+ var in: FileInputStream = null
+ try {
+ out = new FileOutputStream(outputFile)
+ for (i <- 0 until numPartitions) {
+ val file = partitionWriters(i).fileSegment().file
--- End diff --
Yeah unfortunately that's just how you get the file
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/1799
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r15898996
--- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -120,6 +128,18 @@ private[spark] class ExternalSorter[K, V, C](
// How much of the shared memory pool this collection has claimed
private var myMemoryThreshold = 0L
+ // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need
+ // local aggregation and sorting, write numPartitions files directly and just concatenate them
+ // at the end. This avoids doing serialization and deserialization twice to merge together the
+ // spilled files, which would happen with the normal code path. The downside is having multiple
+ // files open at a time and thus more memory allocated to buffers.
+ private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
+ private[collection] val bypassMergeSort = // private[collection] for unit tests
--- End diff --
You can use this in unit test: http://doc.scalatest.org/1.4.1/org/scalatest/PrivateMethodTester.html
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51296001
QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18000/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51300053
test this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51377222
@rxin / @andrewor14 would be good if you review this when you have a chance. This is something we should add in 1.1 since sort-based shuffle is still off by default.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51295530
QA tests have started for PR 1799. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17999/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1799#discussion_r16033699
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -246,8 +250,13 @@ object SparkEnv extends Logging {
"."
}
- val shuffleManager = instantiateClass[ShuffleManager](
- "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
+ // Let the user specify short names for shuffle managers
+ val shortShuffleMgrNames = Map(
+ "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
+ "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
+ val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
--- End diff --
I'd rather not change the configuration under the user, that would be confusing if they later print it or look in the web UI. Instead, maybe add a SparkEnv.getShuffleManagerClass(conf: SparkConf) that can return the real class name.
Also I'd be fine initializing the ShuffleBlockManager after the ShuffleManager if that works, and using isInstanceOf. That would be the cleanest.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51377274
BTW the test failures both time were in a Flume test for streaming, which might just be flaky.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51376950
test this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: SPARK-2787: Make sort-based shuffle write file...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1799#issuecomment-51290081
QA tests have started for PR 1799. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17985/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org