You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 11:49:41 UTC
spark git commit: [SPARK-20950][CORE] add a new config to
diskWriteBufferSize which is hard coded before
Repository: spark
Updated Branches:
refs/heads/master d540dfbff -> 565e7a8d4
[SPARK-20950][CORE] add a new config to diskWriteBufferSize which is hard coded before
## What changes were proposed in this pull request?
This PR Improvement in two:
1.With spark.shuffle.spill.diskWriteBufferSize configure diskWriteBufferSize of ShuffleExternalSorter.
when change the size of the diskWriteBufferSize to test `forceSorterToSpill`
The average performance of running 10 times is as follows:(their unit is MS).
```
diskWriteBufferSize: 1M 512K 256K 128K 64K 32K 16K 8K 4K
---------------------------------------------------------------------------------------
RecordSize = 2.5M 742 722 694 686 667 668 671 669 683
RecordSize = 1M 294 293 292 287 283 285 281 279 285
```
2.Remove outputBufferSizeInBytes and inputBufferSizeInBytes to initialize in mergeSpillsWithFileStream function.
## How was this patch tested?
The unit test.
Author: caoxuewen <ca...@zte.com.cn>
Closes #18174 from heary-cao/buffersize.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/565e7a8d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/565e7a8d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/565e7a8d
Branch: refs/heads/master
Commit: 565e7a8d4ae7879ee704fb94ae9b3da31e202d7e
Parents: d540dfb
Author: caoxuewen <ca...@zte.com.cn>
Authored: Thu Jul 6 19:49:34 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 19:49:34 2017 +0800
----------------------------------------------------------------------
.../shuffle/sort/ShuffleExternalSorter.java | 11 +++++---
.../spark/shuffle/sort/UnsafeShuffleWriter.java | 14 +++++++---
.../unsafe/sort/UnsafeSorterSpillWriter.java | 24 ++++++++++-------
.../apache/spark/internal/config/package.scala | 27 ++++++++++++++++++++
4 files changed, 60 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index c33d1e3..338faaa 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -43,6 +43,7 @@ import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
/**
* An external sorter that is specialized for sort-based shuffle.
@@ -82,6 +83,9 @@ final class ShuffleExternalSorter extends MemoryConsumer {
/** The buffer size to use when writing spills using DiskBlockObjectWriter */
private final int fileBufferSizeBytes;
+ /** The buffer size to use when writing the sorted records to an on-disk file */
+ private final int diskWriteBufferSize;
+
/**
* Memory pages that hold the records being sorted. The pages in this list are freed when
* spilling, although in principle we could recycle these pages across spills (on the other hand,
@@ -116,13 +120,14 @@ final class ShuffleExternalSorter extends MemoryConsumer {
this.taskContext = taskContext;
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
- this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.fileBufferSizeBytes = (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
this.numElementsForSpillThreshold =
conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 * 1024 * 1024);
this.writeMetrics = writeMetrics;
this.inMemSorter = new ShuffleInMemorySorter(
this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
this.peakMemoryUsedBytes = getMemoryUsage();
+ this.diskWriteBufferSize = (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
}
/**
@@ -155,7 +160,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array. This array does not need to be large enough to hold a single
// record;
- final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+ final byte[] writeBuffer = new byte[diskWriteBufferSize];
// Because this output will be read during shuffle, its compression codec must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
@@ -195,7 +200,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
long recordReadPosition = recordOffsetInPage + 4; // skip over record length
while (dataRemaining > 0) {
- final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
+ final int toTransfer = Math.min(diskWriteBufferSize, dataRemaining);
Platform.copyMemory(
recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
writer.write(writeBuffer, 0, toTransfer);
http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index 34c1799..1b57849 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -55,6 +55,7 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
+import org.apache.spark.internal.config.package$;
@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@@ -65,6 +66,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
@VisibleForTesting
static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
+ static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1024 * 1024;
private final BlockManager blockManager;
private final IndexShuffleBlockResolver shuffleBlockResolver;
@@ -78,6 +80,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final SparkConf sparkConf;
private final boolean transferToEnabled;
private final int initialSortBufferSize;
+ private final int inputBufferSizeInBytes;
+ private final int outputBufferSizeInBytes;
@Nullable private MapStatus mapStatus;
@Nullable private ShuffleExternalSorter sorter;
@@ -140,6 +144,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize",
DEFAULT_INITIAL_SORT_BUFFER_SIZE);
+ this.inputBufferSizeInBytes =
+ (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;
+ this.outputBufferSizeInBytes =
+ (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
open();
}
@@ -209,7 +217,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
partitioner.numPartitions(),
sparkConf,
writeMetrics);
- serBuffer = new MyByteArrayOutputStream(1024 * 1024);
+ serBuffer = new MyByteArrayOutputStream(DEFAULT_INITIAL_SER_BUFFER_SIZE);
serOutputStream = serializer.serializeStream(serBuffer);
}
@@ -360,12 +368,10 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
final OutputStream bos = new BufferedOutputStream(
new FileOutputStream(outputFile),
- (int) sparkConf.getSizeAsKb("spark.shuffle.unsafe.file.output.buffer", "32k") * 1024);
+ outputBufferSizeInBytes);
// Use a counting output stream to avoid having to close the underlying file and ask
// the file system for its size after each partition is written.
final CountingOutputStream mergedFileOutputStream = new CountingOutputStream(bos);
- final int inputBufferSizeInBytes =
- (int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
boolean threwException = true;
try {
http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 164b9d7..f9b5493 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -20,9 +20,10 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
import java.io.IOException;
-import org.apache.spark.serializer.SerializerManager;
import scala.Tuple2;
+import org.apache.spark.SparkConf;
+import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.storage.BlockId;
@@ -30,6 +31,7 @@ import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.TempLocalBlockId;
import org.apache.spark.unsafe.Platform;
+import org.apache.spark.internal.config.package$;
/**
* Spills a list of sorted records to disk. Spill files have the following format:
@@ -38,12 +40,16 @@ import org.apache.spark.unsafe.Platform;
*/
public final class UnsafeSorterSpillWriter {
- static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+ private final SparkConf conf = new SparkConf();
+
+ /** The buffer size to use when writing the sorted records to an on-disk file */
+ private final int diskWriteBufferSize =
+ (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
// be an API to directly transfer bytes from managed memory to the disk writer, we buffer
// data through a byte array.
- private byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+ private byte[] writeBuffer = new byte[diskWriteBufferSize];
private final File file;
private final BlockId blockId;
@@ -114,7 +120,7 @@ public final class UnsafeSorterSpillWriter {
writeIntToBuffer(recordLength, 0);
writeLongToBuffer(keyPrefix, 4);
int dataRemaining = recordLength;
- int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8; // space used by prefix + len
+ int freeSpaceInWriteBuffer = diskWriteBufferSize - 4 - 8; // space used by prefix + len
long recordReadPosition = baseOffset;
while (dataRemaining > 0) {
final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
@@ -122,15 +128,15 @@ public final class UnsafeSorterSpillWriter {
baseObject,
recordReadPosition,
writeBuffer,
- Platform.BYTE_ARRAY_OFFSET + (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer),
+ Platform.BYTE_ARRAY_OFFSET + (diskWriteBufferSize - freeSpaceInWriteBuffer),
toTransfer);
- writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer) + toTransfer);
+ writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer) + toTransfer);
recordReadPosition += toTransfer;
dataRemaining -= toTransfer;
- freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE;
+ freeSpaceInWriteBuffer = diskWriteBufferSize;
}
- if (freeSpaceInWriteBuffer < DISK_WRITE_BUFFER_SIZE) {
- writer.write(writeBuffer, 0, (DISK_WRITE_BUFFER_SIZE - freeSpaceInWriteBuffer));
+ if (freeSpaceInWriteBuffer < diskWriteBufferSize) {
+ writer.write(writeBuffer, 0, (diskWriteBufferSize - freeSpaceInWriteBuffer));
}
writer.recordWritten();
}
http://git-wip-us.apache.org/repos/asf/spark/blob/565e7a8d/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 8dee0d9..a629810 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -336,4 +336,31 @@ package object config {
"spark.")
.booleanConf
.createWithDefault(false)
+
+ private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
+ ConfigBuilder("spark.shuffle.file.buffer")
+ .doc("Size of the in-memory buffer for each shuffle file output stream. " +
+ "These buffers reduce the number of disk seeks and system calls made " +
+ "in creating intermediate shuffle files.")
+ .bytesConf(ByteUnit.KiB)
+ .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
+ s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
+ .createWithDefaultString("32k")
+
+ private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
+ ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
+ .doc("The file system for this buffer size after each partition " +
+ "is written in unsafe shuffle writer.")
+ .bytesConf(ByteUnit.KiB)
+ .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
+ s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
+ .createWithDefaultString("32k")
+
+ private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
+ ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize")
+ .doc("The buffer size to use when writing the sorted records to an on-disk file.")
+ .bytesConf(ByteUnit.BYTE)
+ .checkValue(v => v > 0 && v <= Int.MaxValue,
+ s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
+ .createWithDefault(1024 * 1024)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org