You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 11:49:41 UTC

spark git commit: [SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before

Repository: spark
Updated Branches:
  refs/heads/master d540dfbff -> 565e7a8d4


[SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before

## What changes were proposed in this pull request?

This PR Improvement in two:
1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of ShuffleExternalSorter.
    when change the size of the diskWriteBufferSize to test `forceSorterToSpill`
    The average performance of running 10 times is as follows:(their unit is MS).
```
diskWriteBufferSize:       1M    512K    256K    128K    64K    32K    16K    8K    4K
---------------------------------------------------------------------------------------
RecordSize = 2.5M          742   722     694     686     667    668    671    669   683
RecordSize = 1M            294   293     292     287     283    285    281    279   285
```

2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in mergeSpillsWithFileStream function.

## How was this patch tested?
The unit test.

Author: caoxuewen <ca...@zte.com.cn>

Closes #18174 from heary-cao/buffersize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e7a8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e7a8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e7a8d

Branch: refs/heads/master
Commit: 565e7a8d4ae7879ee704fb94ae9b3da31e202d7e
Parents: d540dfb
Author: caoxuewen <ca...@zte.com.cn>
Authored: Thu Jul 6 19:49:34 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 19:49:34 2017 +0800

----------------------------------------------------------------------
 .../shuffle/sort/ShuffleExternalSorter.java     | 11 +++++---
 .../spark/shuffle/sort/UnsafeShuffleWriter.java | 14 +++++++---
 .../unsafe/sort/UnsafeSorterSpillWriter.java    | 24 ++++++++++-------
 .../apache/spark/internal/config/package.scala  | 27 ++++++++++++++++++++
 4 files changed, 60 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index c33d1e3..338faaa 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -43,6 +43,7 @@ import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.array.LongArray;
 import org.apache.spark.unsafe.memory.MemoryBlock;
 import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
 
 /**
  * An external sorter that is specialized for sort-based shuffle.
@@ -82,6 +83,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
   private final int fileBufferSizeBytes;
 
+  /** The buffer size to use when writing the sorted records to an on-disk file */
+  private final int diskWriteBufferSize;
+
   /**
    * Memory pages that hold the records being sorted. The pages in this list are freed when
    * spilling, although in principle we could recycle these pages across spills (on the other hand,
@@ -116,13 +120,14 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     this.taskContext = taskContext;
     this.numPartitions = numPartitions;
     // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
-    this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+    this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
     this.numElementsForSpillThreshold =
       conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
     this.writeMetrics = writeMetrics;
     this.inMemSorter = new ShuffleInMemorySorter(
       this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
     this.peakMemoryUsedBytes = getMemoryUsage();
+    this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
   }
 
   /**
@@ -155,7 +160,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
     // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
     // data through a byte array. This array does not need to be large enough to hold a single
     // record;
-    final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+    final byte[] writeBuffer = new byte[diskWriteBufferSize];
 
     // Because this output will be read during shuffle, its compression codec must be controlled by
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@@ -195,7 +200,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
       int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
       long recordReadPosition = recordOffsetInPage + 4; // skip over record length
       while (dataRemaining > 0) {
-        final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
+        final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
         Platform.copyMemory(
           recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
         writer.write(writeBuffer, 0, toTransfer);

http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 34c1799..1b57849 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -55,6 +55,7 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.TimeTrackingOutputStream;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
 
 @Private
 public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -65,6 +66,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
 
   @VisibleForTesting
   static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
+  static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;
 
   private final BlockManager blockManager;
   private final IndexShuffleBlockResolver shuffleBlockResolver;
@@ -78,6 +80,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
   private final SparkConf sparkConf;
   private final boolean transferToEnabled;
   private final int initialSortBufferSize;
+  private final int inputBufferSizeInBytes;
+  private final int outputBufferSizeInBytes;
 
   @Nullable private MapStatus mapStatus;
   @Nullable private ShuffleExternalSorter sorter;
@@ -140,6 +144,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
     this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
     this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
                                                   DEFAULT_INITIAL_SORT_BUFFER_SIZE);
+    this.inputBufferSizeInBytes =
+      (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
+    this.outputBufferSizeInBytes =
+      (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
     open();
   }
 
@@ -209,7 +217,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
       partitioner.numPartitions(),
       sparkConf,
       writeMetrics);
-    serBuffer = new MyByteArrayOutputStream(1024 * 1024);
+    serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
     serOutputStream = serializer.serializeStream(serBuffer);
   }
 
@@ -360,12 +368,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
 
     final OutputStream bos = new BufferedOutputStream(
             new FileOutputStream(outputFile),
-            (int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024);
+            outputBufferSizeInBytes);
     // Use a counting output stream to avoid having to close the underlying file and ask
     // the file system for its size after each partition is written.
     final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos);
-    final int inputBufferSizeInBytes =
-      (int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 
     boolean threwException = true;
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 164b9d7..f9b5493 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -20,9 +20,10 @@ package org.apache.spark.util.collection.unsafe.sort;
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.spark.serializer.SerializerManager;
 import scala.Tuple2;
 
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.SerializerManager;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.serializer.DummySerializerInstance;
 import org.apache.spark.storage.BlockId;
@@ -30,6 +31,7 @@ import org.apache.spark.storage.BlockManager;
 import org.apache.spark.storage.DiskBlockObjectWriter;
 import org.apache.spark.storage.TempLocalBlockId;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.internal.config.package$;
 
 /**
  * Spills a list of sorted records to disk. Spill files have the following format:
@@ -38,12 +40,16 @@ import org.apache.spark.unsafe.Platform;
  */
 public final class UnsafeSorterSpillWriter {
 
-  static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+  private final SparkConf conf = new SparkConf();
+
+  /** The buffer size to use when writing the sorted records to an on-disk file */
+  private final int diskWriteBufferSize =
+    (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
 
   // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
   // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
   // data through a byte array.
-  private byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+  private byte[] writeBuffer = new byte[diskWriteBufferSize];
 
   private final File file;
   private final BlockId blockId;
@@ -114,7 +120,7 @@ public final class UnsafeSorterSpillWriter {
     writeIntToBuffer(recordLength, 0);
     writeLongToBuffer(keyPrefix, 4);
     int dataRemaining = recordLength;
-    int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8; // space used by prefix + len
+    int freeSpaceInWriteBuffer = diskWriteBufferSize - 4 - 8; // space used by prefix + len
     long recordReadPosition = baseOffset;
     while (dataRemaining > 0) {
       final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
@@ -122,15 +128,15 @@ public final class UnsafeSorterSpillWriter {
         baseObject,
         recordReadPosition,
         writeBuffer,
-        Platform.BYTE_ARRAY_OFFSET + (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer),
+        Platform.BYTE_ARRAY_OFFSET + (diskWriteBufferSize - freeSpaceInWriteBuffer),
         toTransfer);
-      writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer) + toTransfer);
+      writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer) + toTransfer);
       recordReadPosition += toTransfer;
       dataRemaining -= toTransfer;
-      freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE;
+      freeSpaceInWriteBuffer = diskWriteBufferSize;
     }
-    if (freeSpaceInWriteBuffer < DISK_WRITE_BUFFER_SIZE) {
-      writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer));
+    if (freeSpaceInWriteBuffer < diskWriteBufferSize) {
+      writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer));
     }
     writer.recordWritten();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8dee0d9..a629810 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -336,4 +336,31 @@ package object config {
         "spark.")
       .booleanConf
       .createWithDefault(false)
+
+  private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.file.buffer")
+      .doc("Size of the in-memory buffer for each shuffle file output stream. " +
+        "These buffers reduce the number of disk seeks and system calls made " +
+        "in creating intermediate shuffle files.")
+      .bytesConf(ByteUnit.KiB)
+      .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
+        s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
+      .createWithDefaultString("32k")
+
+  private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
+      .doc("The file system for this buffer size after each partition " +
+        "is written in unsafe shuffle writer.")
+      .bytesConf(ByteUnit.KiB)
+      .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
+        s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
+      .createWithDefaultString("32k")
+
+  private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
+    ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
+      .doc("The buffer size to use when writing the sorted records to an on-disk file.")
+      .bytesConf(ByteUnit.BYTE)
+      .checkValue(v => v > 0 && v <= Int.MaxValue,
+        s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
+      .createWithDefault(1024 * 1024)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org