You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/29 06:25:56 UTC

spark git commit: [SPARK-3577] Report Spill size on disk for UnsafeExternalSorter

Repository: spark
Updated Branches:
  refs/heads/master 82e24912d -> a946be35a


[SPARK-3577] Report Spill size on disk for UnsafeExternalSorter

## What changes were proposed in this pull request?

Report Spill size on disk for UnsafeExternalSorter

## How was this patch tested?

Tested by running a job on cluster and verify the spill size on disk.

Author: Sital Kedia <sk...@fb.com>

Closes #17471 from sitalkedia/fix_disk_spill_size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a946be35
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a946be35
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a946be35

Branch: refs/heads/master
Commit: a946be35ac177737e99942ad42de6f319f186138
Parents: 82e2491
Author: Sital Kedia <sk...@fb.com>
Authored: Thu Jun 29 14:25:51 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 29 14:25:51 2017 +0800

----------------------------------------------------------------------
 .../unsafe/sort/UnsafeExternalSorter.java       |  9 ++++---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  | 25 ++++++++++++++++++++
 2 files changed, 29 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a946be35/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index f312fa2..82d03e3 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -54,7 +54,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
   private final BlockManager blockManager;
   private final SerializerManager serializerManager;
   private final TaskContext taskContext;
-  private ShuffleWriteMetrics writeMetrics;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
   private final int fileBufferSizeBytes;
@@ -144,10 +143,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
     // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024
     this.fileBufferSizeBytes = 32 * 1024;
-    // The spill metrics are stored in a new ShuffleWriteMetrics,
-    // and then discarded (this fixes SPARK-16827).
-    // TODO: Instead, separate spill metrics should be stored and reported (tracked in SPARK-3577).
-    this.writeMetrics = new ShuffleWriteMetrics();
 
     if (existingInMemorySorter == null) {
       this.inMemSorter = new UnsafeInMemorySorter(
@@ -199,6 +194,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       spillWriters.size(),
       spillWriters.size() > 1 ? " times" : " time");
 
+    ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
     // We only write out contents of the inMemSorter if it is not empty.
     if (inMemSorter.numRecords() > 0) {
       final UnsafeSorterSpillWriter spillWriter =
@@ -226,6 +222,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
     // pages, we might not be able to get memory for the pointer array.
 
     taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+    taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
     totalSpillBytes += spillSize;
     return spillSize;
   }
@@ -502,6 +499,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         UnsafeInMemorySorter.SortedIterator inMemIterator =
           ((UnsafeInMemorySorter.SortedIterator) upstream).clone();
 
+       ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
         // Iterate over the records that have not been returned and spill them.
         final UnsafeSorterSpillWriter spillWriter =
           new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
@@ -540,6 +538,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
         inMemSorter.free();
         inMemSorter = null;
         taskContext.taskMetrics().incMemoryBytesSpilled(released);
+        taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
         totalSpillBytes += released;
         return released;
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/a946be35/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 771d390..d31d7c1 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -406,6 +406,31 @@ public class UnsafeExternalSorterSuite {
   }
 
   @Test
+  public void testDiskSpilledBytes() throws Exception {
+    final UnsafeExternalSorter sorter = newSorter();
+    long[] record = new long[100];
+    int recordSize = record.length * 8;
+    int n = (int) pageSizeBytes / recordSize * 3;
+    for (int i = 0; i < n; i++) {
+      record[0] = (long) i;
+      sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false);
+    }
+    // We will have at-least 2 memory pages allocated because of rounding happening due to
+    // integer division of pageSizeBytes and recordSize.
+    assertTrue(sorter.getNumberOfAllocatedPages() >= 2);
+    assertTrue(taskContext.taskMetrics().diskBytesSpilled() == 0);
+    UnsafeExternalSorter.SpillableIterator iter =
+            (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+    assertTrue(iter.spill() > 0);
+    assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
+    assertEquals(0, iter.spill());
+    // Even if we did not spill second time, the disk spilled bytes should still be non-zero
+    assertTrue(taskContext.taskMetrics().diskBytesSpilled() > 0);
+    sorter.cleanupResources();
+    assertSpillFilesWereCleanedUp();
+  }
+
+  @Test
   public void testPeakMemoryUsed() throws Exception {
     final long recordLengthBytes = 8;
     final long pageSizeBytes = 256;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org