You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/10/22 18:46:42 UTC
[4/4] spark git commit: [SPARK-10708] Consolidate sort shuffle
implementations
[SPARK-10708] Consolidate sort shuffle implementations
There's a lot of duplication between SortShuffleManager and UnsafeShuffleManager. Given that these now provide the same set of functionality, now that UnsafeShuffleManager supports large records, I think that we should replace SortShuffleManager's serialized shuffle implementation with UnsafeShuffleManager's and should merge the two managers together.
Author: Josh Rosen <jo...@databricks.com>
Closes #8829 from JoshRosen/consolidate-sort-shuffle-implementations.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6d06adf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6d06adf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6d06adf
Branch: refs/heads/master
Commit: f6d06adf05afa9c5386dc2396c94e7a98730289f
Parents: 94e2064
Author: Josh Rosen <jo...@databricks.com>
Authored: Thu Oct 22 09:46:30 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Thu Oct 22 09:46:30 2015 -0700
----------------------------------------------------------------------
.../sort/BypassMergeSortShuffleWriter.java | 106 +++-
.../spark/shuffle/sort/PackedRecordPointer.java | 92 +++
.../shuffle/sort/ShuffleExternalSorter.java | 491 ++++++++++++++++
.../shuffle/sort/ShuffleInMemorySorter.java | 124 ++++
.../shuffle/sort/ShuffleSortDataFormat.java | 67 +++
.../shuffle/sort/SortShuffleFileWriter.java | 53 --
.../apache/spark/shuffle/sort/SpillInfo.java | 37 ++
.../spark/shuffle/sort/UnsafeShuffleWriter.java | 489 ++++++++++++++++
.../shuffle/unsafe/PackedRecordPointer.java | 92 ---
.../apache/spark/shuffle/unsafe/SpillInfo.java | 37 --
.../unsafe/UnsafeShuffleExternalSorter.java | 479 ----------------
.../unsafe/UnsafeShuffleInMemorySorter.java | 124 ----
.../unsafe/UnsafeShuffleSortDataFormat.java | 67 ---
.../shuffle/unsafe/UnsafeShuffleWriter.java | 489 ----------------
.../main/scala/org/apache/spark/SparkEnv.scala | 2 +-
.../spark/shuffle/sort/SortShuffleManager.scala | 175 +++++-
.../spark/shuffle/sort/SortShuffleWriter.scala | 28 +-
.../shuffle/unsafe/UnsafeShuffleManager.scala | 202 -------
.../spark/util/collection/ChainedBuffer.scala | 146 -----
.../spark/util/collection/ExternalSorter.scala | 35 +-
.../PartitionedSerializedPairBuffer.scala | 273 ---------
.../shuffle/sort/PackedRecordPointerSuite.java | 102 ++++
.../sort/ShuffleInMemorySorterSuite.java | 124 ++++
.../shuffle/sort/UnsafeShuffleWriterSuite.java | 560 +++++++++++++++++++
.../unsafe/PackedRecordPointerSuite.java | 101 ----
.../UnsafeShuffleInMemorySorterSuite.java | 124 ----
.../unsafe/UnsafeShuffleWriterSuite.java | 560 -------------------
.../org/apache/spark/SortShuffleSuite.scala | 65 +++
.../spark/scheduler/DAGSchedulerSuite.scala | 6 +-
.../BypassMergeSortShuffleWriterSuite.scala | 64 ++-
.../shuffle/sort/SortShuffleManagerSuite.scala | 131 +++++
.../shuffle/sort/SortShuffleWriterSuite.scala | 45 --
.../unsafe/UnsafeShuffleManagerSuite.scala | 129 -----
.../shuffle/unsafe/UnsafeShuffleSuite.scala | 102 ----
.../util/collection/ChainedBufferSuite.scala | 144 -----
.../PartitionedSerializedPairBufferSuite.scala | 148 -----
docs/configuration.md | 7 +-
project/MimaExcludes.scala | 9 +-
.../apache/spark/sql/execution/Exchange.scala | 23 +-
.../execution/UnsafeRowSerializerSuite.scala | 9 +-
40 files changed, 2600 insertions(+), 3461 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index f5d80bb..ee82d67 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -21,21 +21,30 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import javax.annotation.Nullable;
+import scala.None$;
+import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.Closeables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.IndexShuffleBlockResolver;
+import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.*;
import org.apache.spark.util.Utils;
@@ -62,7 +71,7 @@ import org.apache.spark.util.Utils;
* <p>
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
*/
-final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {
+final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
@@ -72,31 +81,52 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
private final BlockManager blockManager;
private final Partitioner partitioner;
private final ShuffleWriteMetrics writeMetrics;
+ private final int shuffleId;
+ private final int mapId;
private final Serializer serializer;
+ private final IndexShuffleBlockResolver shuffleBlockResolver;
/** Array of file writers, one for each partition */
private DiskBlockObjectWriter[] partitionWriters;
+ @Nullable private MapStatus mapStatus;
+ private long[] partitionLengths;
+
+ /**
+ * Are we in the process of stopping? Because map tasks can call stop() with success = true
+ * and then call stop() with success = false if they get an exception, we want to make sure
+ * we don't try deleting files, etc twice.
+ */
+ private boolean stopping = false;
public BypassMergeSortShuffleWriter(
- SparkConf conf,
BlockManager blockManager,
- Partitioner partitioner,
- ShuffleWriteMetrics writeMetrics,
- Serializer serializer) {
+ IndexShuffleBlockResolver shuffleBlockResolver,
+ BypassMergeSortShuffleHandle<K, V> handle,
+ int mapId,
+ TaskContext taskContext,
+ SparkConf conf) {
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
- this.numPartitions = partitioner.numPartitions();
this.blockManager = blockManager;
- this.partitioner = partitioner;
- this.writeMetrics = writeMetrics;
- this.serializer = serializer;
+ final ShuffleDependency<K, V, V> dep = handle.dependency();
+ this.mapId = mapId;
+ this.shuffleId = dep.shuffleId();
+ this.partitioner = dep.partitioner();
+ this.numPartitions = partitioner.numPartitions();
+ this.writeMetrics = new ShuffleWriteMetrics();
+ taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.serializer = Serializer.getSerializer(dep.serializer());
+ this.shuffleBlockResolver = shuffleBlockResolver;
}
@Override
- public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
+ public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
+ partitionLengths = new long[numPartitions];
+ shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
@@ -124,13 +154,24 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
for (DiskBlockObjectWriter writer : partitionWriters) {
writer.commitAndClose();
}
+
+ partitionLengths =
+ writePartitionedFile(shuffleBlockResolver.getDataFile(shuffleId, mapId));
+ shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
- @Override
- public long[] writePartitionedFile(
- BlockId blockId,
- TaskContext context,
- File outputFile) throws IOException {
+ @VisibleForTesting
+ long[] getPartitionLengths() {
+ return partitionLengths;
+ }
+
+ /**
+ * Concatenate all of the per-partition files into a single combined file.
+ *
+ * @return array of lengths, in bytes, of each partition of the file (used by map output tracker).
+ */
+ private long[] writePartitionedFile(File outputFile) throws IOException {
// Track location of the partition starts in the output file
final long[] lengths = new long[numPartitions];
if (partitionWriters == null) {
@@ -165,18 +206,33 @@ final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<
}
@Override
- public void stop() throws IOException {
- if (partitionWriters != null) {
- try {
- for (DiskBlockObjectWriter writer : partitionWriters) {
- // This method explicitly does _not_ throw exceptions:
- File file = writer.revertPartialWritesAndClose();
- if (!file.delete()) {
- logger.error("Error while deleting file {}", file.getAbsolutePath());
+ public Option<MapStatus> stop(boolean success) {
+ if (stopping) {
+ return None$.empty();
+ } else {
+ stopping = true;
+ if (success) {
+ if (mapStatus == null) {
+ throw new IllegalStateException("Cannot call stop(true) without having called write()");
+ }
+ return Option.apply(mapStatus);
+ } else {
+ // The map task failed, so delete our output data.
+ if (partitionWriters != null) {
+ try {
+ for (DiskBlockObjectWriter writer : partitionWriters) {
+ // This method explicitly does _not_ throw exceptions:
+ File file = writer.revertPartialWritesAndClose();
+ if (!file.delete()) {
+ logger.error("Error while deleting file {}", file.getAbsolutePath());
+ }
+ }
+ } finally {
+ partitionWriters = null;
}
}
- } finally {
- partitionWriters = null;
+ shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
+ return None$.empty();
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
new file mode 100644
index 0000000..c117119
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/PackedRecordPointer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+/**
+ * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
+ * <p>
+ * Within the long, the data is laid out as follows:
+ * <pre>
+ * [24 bit partition number][13 bit memory page number][27 bit offset in page]
+ * </pre>
+ * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
+ * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
+ * 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
+ * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
+ * <p>
+ * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
+ * optimization to future work as it will require more careful design to ensure that addresses are
+ * properly aligned (e.g. by padding records).
+ */
+final class PackedRecordPointer {
+
+ static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
+
+ /**
+ * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
+ */
+ static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
+
+ /** Bit mask for the lower 40 bits of a long. */
+ private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
+
+ /** Bit mask for the upper 24 bits of a long */
+ private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
+
+ /** Bit mask for the lower 27 bits of a long. */
+ private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
+
+ /** Bit mask for the lower 51 bits of a long. */
+ private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
+
+ /** Bit mask for the upper 13 bits of a long */
+ private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
+
+ /**
+ * Pack a record address and partition id into a single word.
+ *
+ * @param recordPointer a record pointer encoded by TaskMemoryManager.
+ * @param partitionId a shuffle partition id (maximum value of 2^24).
+ * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
+ */
+ public static long packPointer(long recordPointer, int partitionId) {
+ assert (partitionId <= MAXIMUM_PARTITION_ID);
+ // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
+ // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
+ final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
+ final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
+ return (((long) partitionId) << 40) | compressedAddress;
+ }
+
+ private long packedRecordPointer;
+
+ public void set(long packedRecordPointer) {
+ this.packedRecordPointer = packedRecordPointer;
+ }
+
+ public int getPartitionId() {
+ return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
+ }
+
+ public long getRecordPointer() {
+ final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
+ final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
+ return pageNumber | offsetInPage;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
new file mode 100644
index 0000000..85fdaa8
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+
+import scala.Tuple2;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.TaskContext;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.serializer.DummySerializerInstance;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.DiskBlockObjectWriter;
+import org.apache.spark.storage.TempShuffleBlockId;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.util.Utils;
+
+/**
+ * An external sorter that is specialized for sort-based shuffle.
+ * <p>
+ * Incoming records are appended to data pages. When all records have been inserted (or when the
+ * current thread's shuffle memory limit is reached), the in-memory records are sorted according to
+ * their partition ids (using a {@link ShuffleInMemorySorter}). The sorted records are then
+ * written to a single output file (or multiple files, if we've spilled). The format of the output
+ * files is the same as the format of the final output file written by
+ * {@link org.apache.spark.shuffle.sort.SortShuffleWriter}: each output partition's records are
+ * written as a single serialized, compressed stream that can be read with a new decompression and
+ * deserialization stream.
+ * <p>
+ * Unlike {@link org.apache.spark.util.collection.ExternalSorter}, this sorter does not merge its
+ * spill files. Instead, this merging is performed in {@link UnsafeShuffleWriter}, which uses a
+ * specialized merge procedure that avoids extra serialization/deserialization.
+ */
+final class ShuffleExternalSorter {
+
+ private final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
+
+ @VisibleForTesting
+ static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
+
+ private final int initialSize;
+ private final int numPartitions;
+ private final int pageSizeBytes;
+ @VisibleForTesting
+ final int maxRecordSizeBytes;
+ private final TaskMemoryManager taskMemoryManager;
+ private final ShuffleMemoryManager shuffleMemoryManager;
+ private final BlockManager blockManager;
+ private final TaskContext taskContext;
+ private final ShuffleWriteMetrics writeMetrics;
+ private long numRecordsInsertedSinceLastSpill = 0;
+
+ /** Force this sorter to spill when there are this many elements in memory. For testing only */
+ private final long numElementsForSpillThreshold;
+
+ /** The buffer size to use when writing spills using DiskBlockObjectWriter */
+ private final int fileBufferSizeBytes;
+
+ /**
+ * Memory pages that hold the records being sorted. The pages in this list are freed when
+ * spilling, although in principle we could recycle these pages across spills (on the other hand,
+ * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
+ * itself).
+ */
+ private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
+
+ private final LinkedList<SpillInfo> spills = new LinkedList<SpillInfo>();
+
+ /** Peak memory used by this sorter so far, in bytes. **/
+ private long peakMemoryUsedBytes;
+
+ // These variables are reset after spilling:
+ @Nullable private ShuffleInMemorySorter inMemSorter;
+ @Nullable private MemoryBlock currentPage = null;
+ private long currentPagePosition = -1;
+ private long freeSpaceInCurrentPage = 0;
+
+ public ShuffleExternalSorter(
+ TaskMemoryManager memoryManager,
+ ShuffleMemoryManager shuffleMemoryManager,
+ BlockManager blockManager,
+ TaskContext taskContext,
+ int initialSize,
+ int numPartitions,
+ SparkConf conf,
+ ShuffleWriteMetrics writeMetrics) throws IOException {
+ this.taskMemoryManager = memoryManager;
+ this.shuffleMemoryManager = shuffleMemoryManager;
+ this.blockManager = blockManager;
+ this.taskContext = taskContext;
+ this.initialSize = initialSize;
+ this.peakMemoryUsedBytes = initialSize;
+ this.numPartitions = numPartitions;
+ // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
+ this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.numElementsForSpillThreshold =
+ conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MAX_VALUE);
+ this.pageSizeBytes = (int) Math.min(
+ PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());
+ this.maxRecordSizeBytes = pageSizeBytes - 4;
+ this.writeMetrics = writeMetrics;
+ initializeForWriting();
+
+ // preserve first page to ensure that we have at least one page to work with. Otherwise,
+ // other operators in the same task may starve this sorter (SPARK-9709).
+ acquireNewPageIfNecessary(pageSizeBytes);
+ }
+
+ /**
+ * Allocates new sort data structures. Called when creating the sorter and after each spill.
+ */
+ private void initializeForWriting() throws IOException {
+ // TODO: move this sizing calculation logic into a static method of sorter:
+ final long memoryRequested = initialSize * 8L;
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
+ if (memoryAcquired != memoryRequested) {
+ shuffleMemoryManager.release(memoryAcquired);
+ throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
+ }
+
+ this.inMemSorter = new ShuffleInMemorySorter(initialSize);
+ numRecordsInsertedSinceLastSpill = 0;
+ }
+
+ /**
+ * Sorts the in-memory records and writes the sorted records to an on-disk file.
+ * This method does not free the sort data structures.
+ *
+ * @param isLastFile if true, this indicates that we're writing the final output file and that the
+ * bytes written should be counted towards shuffle spill metrics rather than
+ * shuffle write metrics.
+ */
+ private void writeSortedFile(boolean isLastFile) throws IOException {
+
+ final ShuffleWriteMetrics writeMetricsToUse;
+
+ if (isLastFile) {
+ // We're writing the final non-spill file, so we _do_ want to count this as shuffle bytes.
+ writeMetricsToUse = writeMetrics;
+ } else {
+ // We're spilling, so bytes written should be counted towards spill rather than write.
+ // Create a dummy WriteMetrics object to absorb these metrics, since we don't want to count
+ // them towards shuffle bytes written.
+ writeMetricsToUse = new ShuffleWriteMetrics();
+ }
+
+ // This call performs the actual sort.
+ final ShuffleInMemorySorter.ShuffleSorterIterator sortedRecords =
+ inMemSorter.getSortedIterator();
+
+ // Currently, we need to open a new DiskBlockObjectWriter for each partition; we can avoid this
+ // after SPARK-5581 is fixed.
+ DiskBlockObjectWriter writer;
+
+ // Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn't seem to
+ // be an API to directly transfer bytes from managed memory to the disk writer, we buffer
+ // data through a byte array. This array does not need to be large enough to hold a single
+ // record;
+ final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE];
+
+ // Because this output will be read during shuffle, its compression codec must be controlled by
+ // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
+ // createTempShuffleBlock here; see SPARK-3426 for more details.
+ final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
+ blockManager.diskBlockManager().createTempShuffleBlock();
+ final File file = spilledFileInfo._2();
+ final TempShuffleBlockId blockId = spilledFileInfo._1();
+ final SpillInfo spillInfo = new SpillInfo(numPartitions, file, blockId);
+
+ // Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.
+ // Our write path doesn't actually use this serializer (since we end up calling the `write()`
+ // OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
+ // around this, we pass a dummy no-op serializer.
+ final SerializerInstance ser = DummySerializerInstance.INSTANCE;
+
+ writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+
+ int currentPartition = -1;
+ while (sortedRecords.hasNext()) {
+ sortedRecords.loadNext();
+ final int partition = sortedRecords.packedRecordPointer.getPartitionId();
+ assert (partition >= currentPartition);
+ if (partition != currentPartition) {
+ // Switch to the new partition
+ if (currentPartition != -1) {
+ writer.commitAndClose();
+ spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+ }
+ currentPartition = partition;
+ writer =
+ blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse);
+ }
+
+ final long recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();
+ final Object recordPage = taskMemoryManager.getPage(recordPointer);
+ final long recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);
+ int dataRemaining = Platform.getInt(recordPage, recordOffsetInPage);
+ long recordReadPosition = recordOffsetInPage + 4; // skip over record length
+ while (dataRemaining > 0) {
+ final int toTransfer = Math.min(DISK_WRITE_BUFFER_SIZE, dataRemaining);
+ Platform.copyMemory(
+ recordPage, recordReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
+ writer.write(writeBuffer, 0, toTransfer);
+ recordReadPosition += toTransfer;
+ dataRemaining -= toTransfer;
+ }
+ writer.recordWritten();
+ }
+
+ if (writer != null) {
+ writer.commitAndClose();
+ // If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,
+ // then the file might be empty. Note that it might be better to avoid calling
+ // writeSortedFile() in that case.
+ if (currentPartition != -1) {
+ spillInfo.partitionLengths[currentPartition] = writer.fileSegment().length();
+ spills.add(spillInfo);
+ }
+ }
+
+ if (!isLastFile) { // i.e. this is a spill file
+ // The current semantics of `shuffleRecordsWritten` seem to be that it's updated when records
+ // are written to disk, not when they enter the shuffle sorting code. DiskBlockObjectWriter
+ // relies on its `recordWritten()` method being called in order to trigger periodic updates to
+ // `shuffleBytesWritten`. If we were to remove the `recordWritten()` call and increment that
+ // counter at a higher-level, then the in-progress metrics for records written and bytes
+ // written would get out of sync.
+ //
+ // When writing the last file, we pass `writeMetrics` directly to the DiskBlockObjectWriter;
+ // in all other cases, we pass in a dummy write metrics to capture metrics, then copy those
+ // metrics to the true write metrics here. The reason for performing this copying is so that
+ // we can avoid reporting spilled bytes as shuffle write bytes.
+ //
+ // Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
+ // Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
+ // This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
+ writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
+ taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
+ }
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ */
+ @VisibleForTesting
+ void spill() throws IOException {
+ logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
+ Thread.currentThread().getId(),
+ Utils.bytesToString(getMemoryUsage()),
+ spills.size(),
+ spills.size() > 1 ? " times" : " time");
+
+ writeSortedFile(false);
+ final long inMemSorterMemoryUsage = inMemSorter.getMemoryUsage();
+ inMemSorter = null;
+ shuffleMemoryManager.release(inMemSorterMemoryUsage);
+ final long spillSize = freeMemory();
+ taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
+
+ initializeForWriting();
+ }
+
+ private long getMemoryUsage() {
+ long totalPageSize = 0;
+ for (MemoryBlock page : allocatedPages) {
+ totalPageSize += page.size();
+ }
+ return ((inMemSorter == null) ? 0 : inMemSorter.getMemoryUsage()) + totalPageSize;
+ }
+
+ private void updatePeakMemoryUsed() {
+ long mem = getMemoryUsage();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
+ }
+
+ private long freeMemory() {
+ updatePeakMemoryUsed();
+ long memoryFreed = 0;
+ for (MemoryBlock block : allocatedPages) {
+ taskMemoryManager.freePage(block);
+ shuffleMemoryManager.release(block.size());
+ memoryFreed += block.size();
+ }
+ allocatedPages.clear();
+ currentPage = null;
+ currentPagePosition = -1;
+ freeSpaceInCurrentPage = 0;
+ return memoryFreed;
+ }
+
+ /**
+ * Force all memory and spill files to be deleted; called by shuffle error-handling code.
+ */
+ public void cleanupResources() {
+ freeMemory();
+ for (SpillInfo spill : spills) {
+ if (spill.file.exists() && !spill.file.delete()) {
+ logger.error("Unable to delete spill file {}", spill.file.getPath());
+ }
+ }
+ if (inMemSorter != null) {
+ shuffleMemoryManager.release(inMemSorter.getMemoryUsage());
+ inMemSorter = null;
+ }
+ }
+
+ /**
+ * Checks whether there is enough space to insert an additional record in to the sort pointer
+ * array and grows the array if additional space is required. If the required space cannot be
+ * obtained, then the in-memory data will be spilled to disk.
+ */
+ private void growPointerArrayIfNecessary() throws IOException {
+ assert(inMemSorter != null);
+ if (!inMemSorter.hasSpaceForAnotherRecord()) {
+ logger.debug("Attempting to expand sort pointer array");
+ final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
+ final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
+ if (memoryAcquired < memoryToGrowPointerArray) {
+ shuffleMemoryManager.release(memoryAcquired);
+ spill();
+ } else {
+ inMemSorter.expandPointerArray();
+ shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
+ }
+ }
+ }
+
+ /**
+ * Allocates more memory in order to insert an additional record. This will request additional
+ * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
+ * obtained.
+ *
+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * the record size. This must be less than or equal to the page size (records
+ * that exceed the page size are handled via a different code path which uses
+ * special overflow pages).
+ */
+ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
+ growPointerArrayIfNecessary();
+ if (requiredSpace > freeSpaceInCurrentPage) {
+ logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
+ freeSpaceInCurrentPage);
+ // TODO: we should track metrics on the amount of space wasted when we roll over to a new page
+ // without using the free space at the end of the current page. We should also do this for
+ // BytesToBytesMap.
+ if (requiredSpace > pageSizeBytes) {
+ throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
+ pageSizeBytes + ")");
+ } else {
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquired < pageSizeBytes) {
+ shuffleMemoryManager.release(memoryAcquired);
+ spill();
+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquiredAfterSpilling != pageSizeBytes) {
+ shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
+ throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
+ }
+ }
+ currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
+ currentPagePosition = currentPage.getBaseOffset();
+ freeSpaceInCurrentPage = pageSizeBytes;
+ allocatedPages.add(currentPage);
+ }
+ }
+ }
+
+ /**
+ * Write a record to the shuffle sorter.
+ */
+ public void insertRecord(
+ Object recordBaseObject,
+ long recordBaseOffset,
+ int lengthInBytes,
+ int partitionId) throws IOException {
+
+ if (numRecordsInsertedSinceLastSpill > numElementsForSpillThreshold) {
+ spill();
+ }
+
+ growPointerArrayIfNecessary();
+ // Need 4 bytes to store the record length.
+ final int totalSpaceRequired = lengthInBytes + 4;
+
+ // --- Figure out where to insert the new record ----------------------------------------------
+
+ final MemoryBlock dataPage;
+ long dataPagePosition;
+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+ if (useOverflowPage) {
+ long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+ // The record is larger than the page size, so allocate a special overflow page just to hold
+ // that record.
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGranted != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGranted);
+ spill();
+ final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGrantedAfterSpill != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGrantedAfterSpill);
+ throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+ }
+ }
+ MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+ allocatedPages.add(overflowPage);
+ dataPage = overflowPage;
+ dataPagePosition = overflowPage.getBaseOffset();
+ } else {
+ // The record is small enough to fit in a regular data page, but the current page might not
+ // have enough space to hold it (or no pages have been allocated yet).
+ acquireNewPageIfNecessary(totalSpaceRequired);
+ dataPage = currentPage;
+ dataPagePosition = currentPagePosition;
+ // Update bookkeeping information
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ currentPagePosition += totalSpaceRequired;
+ }
+ final Object dataPageBaseObject = dataPage.getBaseObject();
+
+ final long recordAddress =
+ taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+ Platform.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
+ dataPagePosition += 4;
+ Platform.copyMemory(
+ recordBaseObject, recordBaseOffset, dataPageBaseObject, dataPagePosition, lengthInBytes);
+ assert(inMemSorter != null);
+ inMemSorter.insertRecord(recordAddress, partitionId);
+ numRecordsInsertedSinceLastSpill += 1;
+ }
+
+ /**
+ * Close the sorter, causing any buffered data to be sorted and written out to disk.
+ *
+ * @return metadata for the spill files written by this sorter. If no records were ever inserted
+ * into this sorter, then this will return an empty array.
+ * @throws IOException
+ */
+ public SpillInfo[] closeAndGetSpills() throws IOException {
+ try {
+ if (inMemSorter != null) {
+ // Do not count the final file towards the spill count.
+ writeSortedFile(true);
+ freeMemory();
+ }
+ return spills.toArray(new SpillInfo[spills.size()]);
+ } catch (IOException e) {
+ cleanupResources();
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
new file mode 100644
index 0000000..a8dee6c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import java.util.Comparator;
+
+import org.apache.spark.util.collection.Sorter;
+
+final class ShuffleInMemorySorter {
+
+ private final Sorter<PackedRecordPointer, long[]> sorter;
+ private static final class SortComparator implements Comparator<PackedRecordPointer> {
+ @Override
+ public int compare(PackedRecordPointer left, PackedRecordPointer right) {
+ return left.getPartitionId() - right.getPartitionId();
+ }
+ }
+ private static final SortComparator SORT_COMPARATOR = new SortComparator();
+
+ /**
+ * An array of record pointers and partition ids that have been encoded by
+ * {@link PackedRecordPointer}. The sort operates on this array instead of directly manipulating
+ * records.
+ */
+ private long[] pointerArray;
+
+ /**
+ * The position in the pointer array where new records can be inserted.
+ */
+ private int pointerArrayInsertPosition = 0;
+
+ public ShuffleInMemorySorter(int initialSize) {
+ assert (initialSize > 0);
+ this.pointerArray = new long[initialSize];
+ this.sorter = new Sorter<PackedRecordPointer, long[]>(ShuffleSortDataFormat.INSTANCE);
+ }
+
+ public void expandPointerArray() {
+ final long[] oldArray = pointerArray;
+ // Guard against overflow:
+ final int newLength = oldArray.length * 2 > 0 ? (oldArray.length * 2) : Integer.MAX_VALUE;
+ pointerArray = new long[newLength];
+ System.arraycopy(oldArray, 0, pointerArray, 0, oldArray.length);
+ }
+
+ public boolean hasSpaceForAnotherRecord() {
+ return pointerArrayInsertPosition + 1 < pointerArray.length;
+ }
+
+ public long getMemoryUsage() {
+ return pointerArray.length * 8L;
+ }
+
+ /**
+ * Inserts a record to be sorted.
+ *
+ * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
+ * certain pointer compression techniques used by the sorter, the sort can
+ * only operate on pointers that point to locations in the first
+ * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
+ * @param partitionId the partition id, which must be less than or equal to
+ * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
+ */
+ public void insertRecord(long recordPointer, int partitionId) {
+ if (!hasSpaceForAnotherRecord()) {
+ if (pointerArray.length == Integer.MAX_VALUE) {
+ throw new IllegalStateException("Sort pointer array has reached maximum size");
+ } else {
+ expandPointerArray();
+ }
+ }
+ pointerArray[pointerArrayInsertPosition] =
+ PackedRecordPointer.packPointer(recordPointer, partitionId);
+ pointerArrayInsertPosition++;
+ }
+
+ /**
+ * An iterator-like class that's used instead of Java's Iterator in order to facilitate inlining.
+ */
+ public static final class ShuffleSorterIterator {
+
+ private final long[] pointerArray;
+ private final int numRecords;
+ final PackedRecordPointer packedRecordPointer = new PackedRecordPointer();
+ private int position = 0;
+
+ public ShuffleSorterIterator(int numRecords, long[] pointerArray) {
+ this.numRecords = numRecords;
+ this.pointerArray = pointerArray;
+ }
+
+ public boolean hasNext() {
+ return position < numRecords;
+ }
+
+ public void loadNext() {
+ packedRecordPointer.set(pointerArray[position]);
+ position++;
+ }
+ }
+
+ /**
+ * Return an iterator over record pointers in sorted order.
+ */
+ public ShuffleSorterIterator getSortedIterator() {
+ sorter.sort(pointerArray, 0, pointerArrayInsertPosition, SORT_COMPARATOR);
+ return new ShuffleSorterIterator(pointerArrayInsertPosition, pointerArray);
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
new file mode 100644
index 0000000..8a1e5ae
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleSortDataFormat.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import org.apache.spark.util.collection.SortDataFormat;
+
+final class ShuffleSortDataFormat extends SortDataFormat<PackedRecordPointer, long[]> {
+
+ public static final ShuffleSortDataFormat INSTANCE = new ShuffleSortDataFormat();
+
+ private ShuffleSortDataFormat() { }
+
+ @Override
+ public PackedRecordPointer getKey(long[] data, int pos) {
+ // Since we re-use keys, this method shouldn't be called.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PackedRecordPointer newKey() {
+ return new PackedRecordPointer();
+ }
+
+ @Override
+ public PackedRecordPointer getKey(long[] data, int pos, PackedRecordPointer reuse) {
+ reuse.set(data[pos]);
+ return reuse;
+ }
+
+ @Override
+ public void swap(long[] data, int pos0, int pos1) {
+ final long temp = data[pos0];
+ data[pos0] = data[pos1];
+ data[pos1] = temp;
+ }
+
+ @Override
+ public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
+ dst[dstPos] = src[srcPos];
+ }
+
+ @Override
+ public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
+ System.arraycopy(src, srcPos, dst, dstPos, length);
+ }
+
+ @Override
+ public long[] allocate(int length) {
+ return new long[length];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
deleted file mode 100644
index 656ea04..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/sort/SortShuffleFileWriter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.sort;
-
-import java.io.File;
-import java.io.IOException;
-
-import scala.Product2;
-import scala.collection.Iterator;
-
-import org.apache.spark.annotation.Private;
-import org.apache.spark.TaskContext;
-import org.apache.spark.storage.BlockId;
-
-/**
- * Interface for objects that {@link SortShuffleWriter} uses to write its output files.
- */
-@Private
-public interface SortShuffleFileWriter<K, V> {
-
- void insertAll(Iterator<Product2<K, V>> records) throws IOException;
-
- /**
- * Write all the data added into this shuffle sorter into a file in the disk store. This is
- * called by the SortShuffleWriter and can go through an efficient path of just concatenating
- * binary files if we decided to avoid merge-sorting.
- *
- * @param blockId block ID to write to. The index file will be blockId.name + ".index".
- * @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
- * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
- */
- long[] writePartitionedFile(
- BlockId blockId,
- TaskContext context,
- File outputFile) throws IOException;
-
- void stop() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java b/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
new file mode 100644
index 0000000..df9f7b7
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import java.io.File;
+
+import org.apache.spark.storage.TempShuffleBlockId;
+
+/**
+ * Metadata for a block of data written by {@link ShuffleExternalSorter}.
+ */
+final class SpillInfo {
+ final long[] partitionLengths;
+ final File file;
+ final TempShuffleBlockId blockId;
+
+ public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
+ this.partitionLengths = new long[numPartitions];
+ this.file = file;
+ this.blockId = blockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
new file mode 100644
index 0000000..e8f050c
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.nio.channels.FileChannel;
+import java.util.Iterator;
+
+import scala.Option;
+import scala.Product2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Map;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.*;
+import org.apache.spark.annotation.Private;
+import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.io.CompressionCodec;
+import org.apache.spark.io.CompressionCodec$;
+import org.apache.spark.io.LZFCompressionCodec;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.scheduler.MapStatus$;
+import org.apache.spark.serializer.SerializationStream;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.IndexShuffleBlockResolver;
+import org.apache.spark.shuffle.ShuffleMemoryManager;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.storage.BlockManager;
+import org.apache.spark.storage.TimeTrackingOutputStream;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.memory.TaskMemoryManager;
+
+@Private
+public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
+
+ private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class);
+
+ private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
+
+ @VisibleForTesting
+ static final int INITIAL_SORT_BUFFER_SIZE = 4096;
+
+ private final BlockManager blockManager;
+ private final IndexShuffleBlockResolver shuffleBlockResolver;
+ private final TaskMemoryManager memoryManager;
+ private final ShuffleMemoryManager shuffleMemoryManager;
+ private final SerializerInstance serializer;
+ private final Partitioner partitioner;
+ private final ShuffleWriteMetrics writeMetrics;
+ private final int shuffleId;
+ private final int mapId;
+ private final TaskContext taskContext;
+ private final SparkConf sparkConf;
+ private final boolean transferToEnabled;
+
+ @Nullable private MapStatus mapStatus;
+ @Nullable private ShuffleExternalSorter sorter;
+ private long peakMemoryUsedBytes = 0;
+
+ /** Subclass of ByteArrayOutputStream that exposes `buf` directly. */
+ private static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
+ public MyByteArrayOutputStream(int size) { super(size); }
+ public byte[] getBuf() { return buf; }
+ }
+
+ private MyByteArrayOutputStream serBuffer;
+ private SerializationStream serOutputStream;
+
+ /**
+ * Are we in the process of stopping? Because map tasks can call stop() with success = true
+ * and then call stop() with success = false if they get an exception, we want to make sure
+ * we don't try deleting files, etc twice.
+ */
+ private boolean stopping = false;
+
+ public UnsafeShuffleWriter(
+ BlockManager blockManager,
+ IndexShuffleBlockResolver shuffleBlockResolver,
+ TaskMemoryManager memoryManager,
+ ShuffleMemoryManager shuffleMemoryManager,
+ SerializedShuffleHandle<K, V> handle,
+ int mapId,
+ TaskContext taskContext,
+ SparkConf sparkConf) throws IOException {
+ final int numPartitions = handle.dependency().partitioner().numPartitions();
+ if (numPartitions > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {
+ throw new IllegalArgumentException(
+ "UnsafeShuffleWriter can only be used for shuffles with at most " +
+ SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE() + " reduce partitions");
+ }
+ this.blockManager = blockManager;
+ this.shuffleBlockResolver = shuffleBlockResolver;
+ this.memoryManager = memoryManager;
+ this.shuffleMemoryManager = shuffleMemoryManager;
+ this.mapId = mapId;
+ final ShuffleDependency<K, V, V> dep = handle.dependency();
+ this.shuffleId = dep.shuffleId();
+ this.serializer = Serializer.getSerializer(dep.serializer()).newInstance();
+ this.partitioner = dep.partitioner();
+ this.writeMetrics = new ShuffleWriteMetrics();
+ taskContext.taskMetrics().shuffleWriteMetrics_$eq(Option.apply(writeMetrics));
+ this.taskContext = taskContext;
+ this.sparkConf = sparkConf;
+ this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
+ open();
+ }
+
+ @VisibleForTesting
+ public int maxRecordSizeBytes() {
+ assert(sorter != null);
+ return sorter.maxRecordSizeBytes;
+ }
+
+ private void updatePeakMemoryUsed() {
+ // sorter can be null if this writer is closed
+ if (sorter != null) {
+ long mem = sorter.getPeakMemoryUsedBytes();
+ if (mem > peakMemoryUsedBytes) {
+ peakMemoryUsedBytes = mem;
+ }
+ }
+ }
+
+ /**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsedBytes() {
+ updatePeakMemoryUsed();
+ return peakMemoryUsedBytes;
+ }
+
+ /**
+ * This convenience method should only be called in test code.
+ */
+ @VisibleForTesting
+ public void write(Iterator<Product2<K, V>> records) throws IOException {
+ write(JavaConverters.asScalaIteratorConverter(records).asScala());
+ }
+
+ @Override
+ public void write(scala.collection.Iterator<Product2<K, V>> records) throws IOException {
+ // Keep track of success so we know if we encountered an exception
+ // We do this rather than a standard try/catch/re-throw to handle
+ // generic throwables.
+ boolean success = false;
+ try {
+ while (records.hasNext()) {
+ insertRecordIntoSorter(records.next());
+ }
+ closeAndWriteOutput();
+ success = true;
+ } finally {
+ if (sorter != null) {
+ try {
+ sorter.cleanupResources();
+ } catch (Exception e) {
+ // Only throw this error if we won't be masking another
+ // error.
+ if (success) {
+ throw e;
+ } else {
+ logger.error("In addition to a failure during writing, we failed during " +
+ "cleanup.", e);
+ }
+ }
+ }
+ }
+ }
+
+ private void open() throws IOException {
+ assert (sorter == null);
+ sorter = new ShuffleExternalSorter(
+ memoryManager,
+ shuffleMemoryManager,
+ blockManager,
+ taskContext,
+ INITIAL_SORT_BUFFER_SIZE,
+ partitioner.numPartitions(),
+ sparkConf,
+ writeMetrics);
+ serBuffer = new MyByteArrayOutputStream(1024 * 1024);
+ serOutputStream = serializer.serializeStream(serBuffer);
+ }
+
+ @VisibleForTesting
+ void closeAndWriteOutput() throws IOException {
+ assert(sorter != null);
+ updatePeakMemoryUsed();
+ serBuffer = null;
+ serOutputStream = null;
+ final SpillInfo[] spills = sorter.closeAndGetSpills();
+ sorter = null;
+ final long[] partitionLengths;
+ try {
+ partitionLengths = mergeSpills(spills);
+ } finally {
+ for (SpillInfo spill : spills) {
+ if (spill.file.exists() && ! spill.file.delete()) {
+ logger.error("Error while deleting spill file {}", spill.file.getPath());
+ }
+ }
+ }
+ shuffleBlockResolver.writeIndexFile(shuffleId, mapId, partitionLengths);
+ mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
+ }
+
+ @VisibleForTesting
+ void insertRecordIntoSorter(Product2<K, V> record) throws IOException {
+ assert(sorter != null);
+ final K key = record._1();
+ final int partitionId = partitioner.getPartition(key);
+ serBuffer.reset();
+ serOutputStream.writeKey(key, OBJECT_CLASS_TAG);
+ serOutputStream.writeValue(record._2(), OBJECT_CLASS_TAG);
+ serOutputStream.flush();
+
+ final int serializedRecordSize = serBuffer.size();
+ assert (serializedRecordSize > 0);
+
+ sorter.insertRecord(
+ serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);
+ }
+
+ @VisibleForTesting
+ void forceSorterToSpill() throws IOException {
+ assert (sorter != null);
+ sorter.spill();
+ }
+
+ /**
+ * Merge zero or more spill files together, choosing the fastest merging strategy based on the
+ * number of spills and the IO compression codec.
+ *
+ * @return the partition lengths in the merged file.
+ */
+ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
+ final File outputFile = shuffleBlockResolver.getDataFile(shuffleId, mapId);
+ final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);
+ final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
+ final boolean fastMergeEnabled =
+ sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
+ final boolean fastMergeIsSupported =
+ !compressionEnabled || compressionCodec instanceof LZFCompressionCodec;
+ try {
+ if (spills.length == 0) {
+ new FileOutputStream(outputFile).close(); // Create an empty file
+ return new long[partitioner.numPartitions()];
+ } else if (spills.length == 1) {
+ // Here, we don't need to perform any metrics updates because the bytes written to this
+ // output file would have already been counted as shuffle bytes written.
+ Files.move(spills[0].file, outputFile);
+ return spills[0].partitionLengths;
+ } else {
+ final long[] partitionLengths;
+ // There are multiple spills to merge, so none of these spill files' lengths were counted
+ // towards our shuffle write count or shuffle write time. If we use the slow merge path,
+ // then the final output file's size won't necessarily be equal to the sum of the spill
+ // files' sizes. To guard against this case, we look at the output file's actual size when
+ // computing shuffle bytes written.
+ //
+ // We allow the individual merge methods to report their own IO times since different merge
+ // strategies use different IO techniques. We count IO during merge towards the shuffle
+ // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
+ // branch in ExternalSorter.
+ if (fastMergeEnabled && fastMergeIsSupported) {
+ // Compression is disabled or we are using an IO compression codec that supports
+ // decompression of concatenated compressed streams, so we can perform a fast spill merge
+ // that doesn't need to interpret the spilled bytes.
+ if (transferToEnabled) {
+ logger.debug("Using transferTo-based fast merge");
+ partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
+ } else {
+ logger.debug("Using fileStream-based fast merge");
+ partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
+ }
+ } else {
+ logger.debug("Using slow merge");
+ partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
+ }
+ // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
+ // in-memory records, we write out the in-memory records to a file but do not count that
+ // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
+ // to be counted as shuffle write, but this will lead to double-counting of the final
+ // SpillInfo's bytes.
+ writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
+ writeMetrics.incShuffleBytesWritten(outputFile.length());
+ return partitionLengths;
+ }
+ } catch (IOException e) {
+ if (outputFile.exists() && !outputFile.delete()) {
+ logger.error("Unable to delete output file {}", outputFile.getPath());
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Merges spill files using Java FileStreams. This code path is slower than the NIO-based merge,
+ * {@link UnsafeShuffleWriter#mergeSpillsWithTransferTo(SpillInfo[], File)}, so it's only used in
+ * cases where the IO compression codec does not support concatenation of compressed data, or in
+ * cases where users have explicitly disabled use of {@code transferTo} in order to work around
+ * kernel bugs.
+ *
+ * @param spills the spills to merge.
+ * @param outputFile the file to write the merged data to.
+ * @param compressionCodec the IO compression codec, or null if shuffle compression is disabled.
+ * @return the partition lengths in the merged file.
+ */
+ private long[] mergeSpillsWithFileStream(
+ SpillInfo[] spills,
+ File outputFile,
+ @Nullable CompressionCodec compressionCodec) throws IOException {
+ assert (spills.length >= 2);
+ final int numPartitions = partitioner.numPartitions();
+ final long[] partitionLengths = new long[numPartitions];
+ final InputStream[] spillInputStreams = new FileInputStream[spills.length];
+ OutputStream mergedFileOutputStream = null;
+
+ boolean threwException = true;
+ try {
+ for (int i = 0; i < spills.length; i++) {
+ spillInputStreams[i] = new FileInputStream(spills[i].file);
+ }
+ for (int partition = 0; partition < numPartitions; partition++) {
+ final long initialFileLength = outputFile.length();
+ mergedFileOutputStream =
+ new TimeTrackingOutputStream(writeMetrics, new FileOutputStream(outputFile, true));
+ if (compressionCodec != null) {
+ mergedFileOutputStream = compressionCodec.compressedOutputStream(mergedFileOutputStream);
+ }
+
+ for (int i = 0; i < spills.length; i++) {
+ final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+ if (partitionLengthInSpill > 0) {
+ InputStream partitionInputStream =
+ new LimitedInputStream(spillInputStreams[i], partitionLengthInSpill);
+ if (compressionCodec != null) {
+ partitionInputStream = compressionCodec.compressedInputStream(partitionInputStream);
+ }
+ ByteStreams.copy(partitionInputStream, mergedFileOutputStream);
+ }
+ }
+ mergedFileOutputStream.flush();
+ mergedFileOutputStream.close();
+ partitionLengths[partition] = (outputFile.length() - initialFileLength);
+ }
+ threwException = false;
+ } finally {
+ // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+ // throw exceptions during cleanup if threwException == false.
+ for (InputStream stream : spillInputStreams) {
+ Closeables.close(stream, threwException);
+ }
+ Closeables.close(mergedFileOutputStream, threwException);
+ }
+ return partitionLengths;
+ }
+
+ /**
+ * Merges spill files by using NIO's transferTo to concatenate spill partitions' bytes.
+ * This is only safe when the IO compression codec and serializer support concatenation of
+ * serialized streams.
+ *
+ * @return the partition lengths in the merged file.
+ */
+ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) throws IOException {
+ assert (spills.length >= 2);
+ final int numPartitions = partitioner.numPartitions();
+ final long[] partitionLengths = new long[numPartitions];
+ final FileChannel[] spillInputChannels = new FileChannel[spills.length];
+ final long[] spillInputChannelPositions = new long[spills.length];
+ FileChannel mergedFileOutputChannel = null;
+
+ boolean threwException = true;
+ try {
+ for (int i = 0; i < spills.length; i++) {
+ spillInputChannels[i] = new FileInputStream(spills[i].file).getChannel();
+ }
+ // This file needs to opened in append mode in order to work around a Linux kernel bug that
+ // affects transferTo; see SPARK-3948 for more details.
+ mergedFileOutputChannel = new FileOutputStream(outputFile, true).getChannel();
+
+ long bytesWrittenToMergedFile = 0;
+ for (int partition = 0; partition < numPartitions; partition++) {
+ for (int i = 0; i < spills.length; i++) {
+ final long partitionLengthInSpill = spills[i].partitionLengths[partition];
+ long bytesToTransfer = partitionLengthInSpill;
+ final FileChannel spillInputChannel = spillInputChannels[i];
+ final long writeStartTime = System.nanoTime();
+ while (bytesToTransfer > 0) {
+ final long actualBytesTransferred = spillInputChannel.transferTo(
+ spillInputChannelPositions[i],
+ bytesToTransfer,
+ mergedFileOutputChannel);
+ spillInputChannelPositions[i] += actualBytesTransferred;
+ bytesToTransfer -= actualBytesTransferred;
+ }
+ writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
+ bytesWrittenToMergedFile += partitionLengthInSpill;
+ partitionLengths[partition] += partitionLengthInSpill;
+ }
+ }
+ // Check the position after transferTo loop to see if it is in the right position and raise an
+ // exception if it is incorrect. The position will not be increased to the expected length
+ // after calling transferTo in kernel version 2.6.32. This issue is described at
+ // https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
+ if (mergedFileOutputChannel.position() != bytesWrittenToMergedFile) {
+ throw new IOException(
+ "Current position " + mergedFileOutputChannel.position() + " does not equal expected " +
+ "position " + bytesWrittenToMergedFile + " after transferTo. Please check your kernel" +
+ " version to see if it is 2.6.32, as there is a kernel bug which will lead to " +
+ "unexpected behavior when using transferTo. You can set spark.file.transferTo=false " +
+ "to disable this NIO feature."
+ );
+ }
+ threwException = false;
+ } finally {
+ // To avoid masking exceptions that caused us to prematurely enter the finally block, only
+ // throw exceptions during cleanup if threwException == false.
+ for (int i = 0; i < spills.length; i++) {
+ assert(spillInputChannelPositions[i] == spills[i].file.length());
+ Closeables.close(spillInputChannels[i], threwException);
+ }
+ Closeables.close(mergedFileOutputChannel, threwException);
+ }
+ return partitionLengths;
+ }
+
+ @Override
+ public Option<MapStatus> stop(boolean success) {
+ try {
+ // Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
+ Map<String, Accumulator<Object>> internalAccumulators =
+ taskContext.internalMetricsToAccumulators();
+ if (internalAccumulators != null) {
+ internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
+ .add(getPeakMemoryUsedBytes());
+ }
+
+ if (stopping) {
+ return Option.apply(null);
+ } else {
+ stopping = true;
+ if (success) {
+ if (mapStatus == null) {
+ throw new IllegalStateException("Cannot call stop(true) without having called write()");
+ }
+ return Option.apply(mapStatus);
+ } else {
+ // The map task failed, so delete our output data.
+ shuffleBlockResolver.removeDataByMap(shuffleId, mapId);
+ return Option.apply(null);
+ }
+ }
+ } finally {
+ if (sorter != null) {
+ // If sorter is non-null, then this implies that we called stop() in response to an error,
+ // so we need to clean up memory and spill files created by the sorter
+ sorter.cleanupResources();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
deleted file mode 100644
index 4ee6a82..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-/**
- * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
- * <p>
- * Within the long, the data is laid out as follows:
- * <pre>
- * [24 bit partition number][13 bit memory page number][27 bit offset in page]
- * </pre>
- * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
- * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
- * 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
- * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
- * <p>
- * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
- * optimization to future work as it will require more careful design to ensure that addresses are
- * properly aligned (e.g. by padding records).
- */
-final class PackedRecordPointer {
-
- static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
-
- /**
- * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
- */
- static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
-
- /** Bit mask for the lower 40 bits of a long. */
- private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
-
- /** Bit mask for the upper 24 bits of a long */
- private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
-
- /** Bit mask for the lower 27 bits of a long. */
- private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
-
- /** Bit mask for the lower 51 bits of a long. */
- private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
-
- /** Bit mask for the upper 13 bits of a long */
- private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
-
- /**
- * Pack a record address and partition id into a single word.
- *
- * @param recordPointer a record pointer encoded by TaskMemoryManager.
- * @param partitionId a shuffle partition id (maximum value of 2^24).
- * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
- */
- public static long packPointer(long recordPointer, int partitionId) {
- assert (partitionId <= MAXIMUM_PARTITION_ID);
- // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
- // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
- final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
- final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
- return (((long) partitionId) << 40) | compressedAddress;
- }
-
- private long packedRecordPointer;
-
- public void set(long packedRecordPointer) {
- this.packedRecordPointer = packedRecordPointer;
- }
-
- public int getPartitionId() {
- return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
- }
-
- public long getRecordPointer() {
- final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
- final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
- return pageNumber | offsetInPage;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/f6d06adf/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
deleted file mode 100644
index 7bac0dc..0000000
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/SpillInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.shuffle.unsafe;
-
-import java.io.File;
-
-import org.apache.spark.storage.TempShuffleBlockId;
-
-/**
- * Metadata for a block of data written by {@link UnsafeShuffleExternalSorter}.
- */
-final class SpillInfo {
- final long[] partitionLengths;
- final File file;
- final TempShuffleBlockId blockId;
-
- public SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
- this.partitionLengths = new long[numPartitions];
- this.file = file;
- this.blockId = blockId;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org