You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/29 06:25:56 UTC
spark git commit: [SPARK-3577] Report Spill size on disk for
UnsafeExternalSorter
Repository: spark
Updated Branches:
refs/heads/master 82e24912d -> a946be35a
[SPARK-3577] Report Spill size on disk for UnsafeExternalSorter
## What changes were proposed in this pull request?
Report Spill size on disk for UnsafeExternalSorter
## How was this patch tested?
Tested by running a job on cluster and verify the spill size on disk.
Author: Sital Kedia <sk...@fb.com>
Closes #17471 from sitalkedia/fix_disk_spill_size.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a946be35
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a946be35
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a946be35
Branch: refs/heads/master
Commit: a946be35ac177737e99942ad42de6f319f186138
Parents: 82e2491
Author: Sital Kedia <sk...@fb.com>
Authored: Thu Jun 29 14:25:51 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 29 14:25:51 2017 +0800
----------------------------------------------------------------------
.../unsafe/sort/UnsafeExternalSorter.java | 9 ++++---
.../unsafe/sort/UnsafeExternalSorterSuite.java | 25 ++++++++++++++++++++
2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a946be35/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f312fa2..82d03e3 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -54,7 +54,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final BlockManager blockManager;
private final SerializerManager serializerManager;
private final TaskContext taskContext;
- private ShuffleWriteMetrics writeMetrics;
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
@@ -144,10 +143,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
// this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024
this.fileBufferSizeBytes = 32 * 1024;
- // The spill metrics are stored in a new ShuffleWriteMetrics,
- // and then discarded (this fixes SPARK-16827).
- // TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577).
- this.writeMetrics = new ShuffleWriteMetrics();
if (existingInMemorySorter == null) {
this.inMemSorter = new UnsafeInMemorySorter(
@@ -199,6 +194,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
spillWriters.size(),
spillWriters.size() > 1 ? " times" : " time");
+ ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// We only write out contents of the inMemSorter if it is not empty.
if (inMemSorter.numRecords() > 0) {
final UnsafeSorterSpillWriter spillWriter =
@@ -226,6 +222,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// pages, we might not be able to get memory for the pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+ taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
totalSpillBytes += spillSize;
return spillSize;
}
@@ -502,6 +499,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
UnsafeInMemorySorter.SortedIterator inMemIterator =
((UnsafeInMemorySorter.SortedIterator) upstream).clone();
+ ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
// Iterate over the records that have not been returned and spill them.
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
@@ -540,6 +538,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
inMemSorter.free();
inMemSorter = null;
taskContext.taskMetrics().incMemoryBytesSpilled(released);
+ taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
totalSpillBytes += released;
return released;
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a946be35/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 771d390..d31d7c1 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -406,6 +406,31 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void testDiskSpilledBytes() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long[] record = new long[100];
+ int recordSize = record.length * 8;
+ int n = (int) pageSizeBytes / recordSize * 3;
+ for (int i = 0; i < n; i++) {
+ record[0] = (long) i;
+ sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false);
+ }
+ // We will have at-least 2 memory pages allocated because of rounding happening due to
+ // integer division of pageSizeBytes and recordSize.
+ assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
+ assertTrue(taskContext.taskMetrics().diskBytesSpilled() == 0);
+ UnsafeExternalSorter.SpillableIterator iter =
+ (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+ assertTrue(iter.spill() > 0);
+ assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
+ assertEquals(0, iter.spill());
+ // Even if we did not spill second time, the disk spilled bytes should still be non-zero
+ assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
public void testPeakMemoryUsed() throws Exception {
final long recordLengthBytes = 8;
final long pageSizeBytes = 256;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org