You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/24 01:48:12 UTC
spark git commit: [SPARK-16862] Configurable buffer size in
`UnsafeSorterSpillReader`
Repository: spark
Updated Branches:
refs/heads/master bf8ff833e -> c1937dd19
[SPARK-16862] Configurable buffer size in `UnsafeSorterSpillReader`
## What changes were proposed in this pull request?
Jira: https://issues.apache.org/jira/browse/SPARK-16862
`BufferedInputStream` used in `UnsafeSorterSpillReader` uses the default 8k buffer to read data off disk. This PR makes it configurable to improve on disk reads. I have made the default value to be 1 MB as with that value I observed improved performance.
## How was this patch tested?
I am relying on the existing unit tests.
## Performance
After deploying this change to prod and setting the config to 1 mb, there was a 12% reduction in the CPU time and 19.5% reduction in CPU reservation time.
Author: Tejas Patil <te...@fb.com>
Closes #14726 from tejasapatil/spill_buffer_2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1937dd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1937dd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1937dd1
Branch: refs/heads/master
Commit: c1937dd19a23bd096a4707656c7ba19fb5c16966
Parents: bf8ff83
Author: Tejas Patil <te...@fb.com>
Authored: Tue Aug 23 18:48:08 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Aug 23 18:48:08 2016 -0700
----------------------------------------------------------------------
.../unsafe/sort/UnsafeSorterSpillReader.java | 22 +++++++++++++++++++-
1 file changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c1937dd1/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 1d588c3..d048cf7 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -22,15 +22,21 @@ import java.io.*;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
+import org.apache.spark.SparkEnv;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reads spill files written by {@link UnsafeSorterSpillWriter} (see that class for a description
* of the file format).
*/
public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(UnsafeSorterSpillReader.class);
+ private static final int DEFAULT_BUFFER_SIZE_BYTES = 1024 * 1024; // 1 MB
+ private static final int MAX_BUFFER_SIZE_BYTES = 16777216; // 16 mb
private InputStream in;
private DataInputStream din;
@@ -50,7 +56,21 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
- final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
+ long bufferSizeBytes =
+ SparkEnv.get() == null ?
+ DEFAULT_BUFFER_SIZE_BYTES:
+ SparkEnv.get().conf().getSizeAsBytes("spark.unsafe.sorter.spill.reader.buffer.size",
+ DEFAULT_BUFFER_SIZE_BYTES);
+ if (bufferSizeBytes > MAX_BUFFER_SIZE_BYTES || bufferSizeBytes < DEFAULT_BUFFER_SIZE_BYTES) {
+ // fall back to a sane default value
+ logger.warn("Value of config \"spark.unsafe.sorter.spill.reader.buffer.size\" = {} not in " +
+ "allowed range [{}, {}). Falling back to default value : {} bytes", bufferSizeBytes,
+ DEFAULT_BUFFER_SIZE_BYTES, MAX_BUFFER_SIZE_BYTES, DEFAULT_BUFFER_SIZE_BYTES);
+ bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
+ }
+
+ final BufferedInputStream bs =
+ new BufferedInputStream(new FileInputStream(file), (int) bufferSizeBytes);
try {
this.in = serializerManager.wrapForCompression(blockId, bs);
this.din = new DataInputStream(this.in);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org