You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/03 07:15:57 UTC
spark git commit: [SPARK-10379] preserve first page in
UnsafeShuffleExternalSorter
Repository: spark
Updated Branches:
refs/heads/master 3ddb9b323 -> 62b4690d6
[SPARK-10379] preserve first page in UnsafeShuffleExternalSorter
Author: Davies Liu <da...@databricks.com>
Closes #8543 from davies/preserve_page.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62b4690d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62b4690d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62b4690d
Branch: refs/heads/master
Commit: 62b4690d6b3016f41292b640ac28644ef31e299d
Parents: 3ddb9b3
Author: Davies Liu <da...@databricks.com>
Authored: Wed Sep 2 22:15:54 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed Sep 2 22:15:54 2015 -0700
----------------------------------------------------------------------
.../spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java | 4 ++++
.../org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala | 2 +-
.../apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java | 5 +++--
3 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/62b4690d/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index 3d1ef0c..e73ba39 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -122,6 +122,10 @@ final class UnsafeShuffleExternalSorter {
this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
+
+ // preserve first page to ensure that we have at least one page to work with. Otherwise,
+ // other operators in the same task may starve this sorter (SPARK-9709).
+ acquireNewPageIfNecessary(pageSizeBytes);
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/62b4690d/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
index 1f2213d..417ff52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithPreparationRDD.scala
@@ -41,7 +41,7 @@ private[spark] class MapPartitionsWithPreparationRDD[U: ClassTag, T: ClassTag, M
// In certain join operations, prepare can be called on the same partition multiple times.
// In this case, we need to ensure that each call to compute gets a separate prepare argument.
- private[this] var preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
+ private[this] val preparedArguments: ArrayBuffer[M] = new ArrayBuffer[M]
/**
* Prepare a partition for a single call to compute.
http://git-wip-us.apache.org/repos/asf/spark/blob/62b4690d/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 94650be..a266b0c 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -530,8 +530,9 @@ public class UnsafeShuffleWriterSuite {
for (int i = 0; i < numRecordsPerPage * 10; i++) {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
newPeakMemory = writer.getPeakMemoryUsedBytes();
- if (i % numRecordsPerPage == 0) {
- // We allocated a new page for this record, so peak memory should change
+ if (i % numRecordsPerPage == 0 && i != 0) {
+ // The first page is allocated in constructor, another page will be allocated after
+ // every numRecordsPerPage records (peak memory should change).
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
assertEquals(previousPeakMemory, newPeakMemory);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org