You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/01 08:46:11 UTC
git commit: TEZ-661. Add an implementation for a non sorted,
partitioned, key-value output. (sseth)
Repository: incubator-tez
Updated Branches:
refs/heads/master 2275c4ee1 -> 26ad7037c
TEZ-661. Add an implementation for a non sorted, partitioned, key-value
output. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/26ad7037
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/26ad7037
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/26ad7037
Branch: refs/heads/master
Commit: 26ad7037c3c01347be63ad33810faee0a3922a94
Parents: 2275c4e
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Apr 30 23:45:21 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Apr 30 23:45:21 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 15 +
.../apache/tez/common/counters/TaskCounter.java | 6 +
.../tez/mapreduce/partition/MRPartitioner.java | 9 +-
.../common/sort/impl/TezIndexRecord.java | 5 +
.../BaseUnorderedPartitionedKVWriter.java | 161 ++++
.../writers/UnorderedPartitionedKVWriter.java | 791 +++++++++++++++++++
.../OnFileUnorderedPartitionedKVOutput.java | 78 +-
.../library/partitioner/HashPartitioner.java | 30 +
.../WeightedScalingMemoryDistributor.java | 3 +
.../TestUnorderedPartitionedKVWriter.java | 673 ++++++++++++++++
10 files changed, 1763 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index ae122ff..8a61c39 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -132,6 +132,21 @@ public class TezJobConfig {
public static final int DEFAULT_TEZ_RUNTIME_SORT_THREADS = 1;
/**
+ * Size of the buffer to use if not writing directly to disk.
+ */
+ public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB =
+ "tez.runtime.unordered.output.buffer.size-mb";
+ public static final int TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT = 100;
+
+ /**
+ * Maximum size for individual buffers used in the UnsortedPartitionedOutput.
+ * This is only meant to be used by unit tests.
+ */
+ @Private
+ public static final String TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES =
+ "tez.runtime.unordered.output.max-per-buffer.size-bytes";
+
+ /**
* Specifies a partitioner class, which is used in Tez Runtime components
* like OnFileSortedOutput
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 731d4ac..8ebc6c8 100644
--- a/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-api/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -91,6 +91,12 @@ public enum TaskCounter {
*/
OUTPUT_RECORDS,
+ /**
+ * Represent the number of large records in the output - typically, records which are
+ * spilled directly
+ */
+ OUTPUT_LARGE_RECORDS,
+
SKIPPED_RECORDS, // Not used at the moment.
/**
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 224900e..c8edcf3 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -33,16 +33,16 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
static final Log LOG = LogFactory.getLog(MRPartitioner.class);
private final boolean useNewApi;
- private int partitions = 1;
- private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
- private org.apache.hadoop.mapred.Partitioner oldPartitioner;
+ private final org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+ private final org.apache.hadoop.mapred.Partitioner oldPartitioner;
public MRPartitioner(Configuration conf) {
this.useNewApi = ConfigUtils.useNewApi(conf);
- this.partitions = conf.getInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 1);
+ int partitions = conf.getInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 1);
if (useNewApi) {
+ oldPartitioner = null;
if (partitions > 1) {
newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
.newInstance(
@@ -58,6 +58,7 @@ public class MRPartitioner implements org.apache.tez.runtime.library.api.Partiti
};
}
} else {
+ newPartitioner = null;
if (partitions > 1) {
oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
(Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
index f6fd802..a5bacd2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezIndexRecord.java
@@ -25,6 +25,11 @@ public class TezIndexRecord {
public TezIndexRecord() { }
+ /**
+ * @param startOffset start offset within the data file
+ * @param rawLength raw data length - typically uncompressed
+ * @param partLength actual data length in file - factors in checksums and compression
+ */
public TezIndexRecord(long startOffset, long rawLength, long partLength) {
this.startOffset = startOffset;
this.rawLength = rawLength;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..f2dccd9
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/BaseUnorderedPartitionedKVWriter.java
@@ -0,0 +1,161 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.TezRuntimeUtils;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+
+@SuppressWarnings("rawtypes")
+public abstract class BaseUnorderedPartitionedKVWriter implements KeyValueWriter {
+
+ private static final Log LOG = LogFactory.getLog(BaseUnorderedPartitionedKVWriter.class);
+
+ protected final TezOutputContext outputContext;
+ protected final Configuration conf;
+ protected final Partitioner partitioner;
+ protected final Class keyClass;
+ protected final Class valClass;
+ protected final Serializer keySerializer;
+ protected final Serializer valSerializer;
+ protected final SerializationFactory serializationFactory;
+ protected final int numPartitions;
+ protected final CompressionCodec codec;
+ protected final TezTaskOutput outputFileHandler;
+
+ protected final boolean ifileReadAhead;
+ protected final int ifileReadAheadLength;
+ protected final int ifileBufferSize;
+
+ /**
+ * Represents the serialized size of the output records. Does not consider
+ * overheads from the buffer meta-information, storage format, or compression
+ * if it is enabled.
+ */
+ protected final TezCounter outputRecordBytesCounter;
+ /**
+ * Represents final number of records written (spills are not counted)
+ */
+ protected final TezCounter outputRecordsCounter;
+ /**
+ * Represents the size of the final output - with any overheads introduced by
+ * meta-information.
+ */
+ protected final TezCounter outputBytesWithOverheadCounter;
+
+ /**
+ * Represents the final output size, with file format overheads and compression factored in.
+ * Does not consider spills.
+ */
+ protected final TezCounter fileOutputBytesCounter;
+ /**
+ * Represents the additional records written to disk due to spills. Does not
+ * count the final write to disk.
+ */
+ protected final TezCounter spilledRecordsCounter;
+ /**
+ * Represents additional bytes written to disk as a result of spills, excluding the final spill.
+ */
+ protected final TezCounter additionalSpillBytesWritternCounter;
+ /**
+ * Represents additional bytes read from disk to merge all the previous spills into a single file.
+ */
+ protected final TezCounter additionalSpillBytesReadCounter;
+ /**
+ * Represents the number of additional spills. The final spill is not counted towards this.
+ */
+ protected final TezCounter numAdditionalSpillsCounter;
+
+ @SuppressWarnings("unchecked")
+ public BaseUnorderedPartitionedKVWriter(TezOutputContext outputContext, Configuration conf, int numOutputs) {
+ this.outputContext = outputContext;
+ this.conf = conf;
+ this.numPartitions = numOutputs;
+
+ // k/v serialization
+ keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+ valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+ serializationFactory = new SerializationFactory(this.conf);
+ keySerializer = serializationFactory.getSerializer(keyClass);
+ valSerializer = serializationFactory.getSerializer(valClass);
+
+ outputRecordBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);
+ outputRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);
+ outputBytesWithOverheadCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ fileOutputBytesCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+ spilledRecordsCounter = outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
+ additionalSpillBytesWritternCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ additionalSpillBytesReadCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+ numAdditionalSpillsCounter = outputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+
+ // compression
+ if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
+ Class<? extends CompressionCodec> codecClass =
+ ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+ codec = ReflectionUtils.newInstance(codecClass, this.conf);
+ } else {
+ codec = null;
+ }
+
+ this.ifileReadAhead = this.conf.getBoolean(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT);
+ if (this.ifileReadAhead) {
+ this.ifileReadAheadLength = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES,
+ TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
+ } else {
+ this.ifileReadAheadLength = 0;
+ }
+ this.ifileBufferSize = conf.getInt("io.file.buffer.size",
+ TezJobConfig.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT);
+
+ LOG.info("Instantiating Partitioner: [" + conf.get(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS) + "]");
+ try {
+ this.partitioner = TezRuntimeUtils.instantiatePartitioner(this.conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ outputFileHandler = TezRuntimeUtils.instantiateTaskOutputManager(conf, outputContext);
+ }
+
+ @Override
+ public abstract void write(Object key, Object value) throws IOException;
+
+ public abstract List<Event> close() throws IOException, InterruptedException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..2d51a7d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.writers;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.ByteString;
+
+public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWriter {
+
+ private static final Log LOG = LogFactory.getLog(UnorderedPartitionedKVWriter.class);
+
+ private static final int INT_SIZE = 4;
+ private static final int NUM_META = 3; // Number of meta fields.
+ private static final int INDEX_KEYLEN = 0; // KeyLength index
+ private static final int INDEX_VALLEN = 1; // ValLength index
+ private static final int INDEX_NEXT = 2; // Next Record Index.
+ private static final int META_SIZE = NUM_META * INT_SIZE; // Size of total meta-data
+
+ private final static int APPROX_HEADER_LENGTH = 150;
+
+ // Maybe setup a separate statistics class which can be shared between the
+ // buffer and the main path instead of having multiple arrays.
+
+ private final long availableMemory;
+ @VisibleForTesting
+ final WrappedBuffer[] buffers;
+ @VisibleForTesting
+ final BlockingQueue<WrappedBuffer> availableBuffers;
+ private final ByteArrayOutputStream baos;
+ private final DataOutputStream dos;
+ @VisibleForTesting
+ WrappedBuffer currentBuffer;
+ private final FileSystem rfs;
+
+ private final List<SpillInfo> spillInfoList = Collections
+ .synchronizedList(new ArrayList<SpillInfo>());
+
+ private final ListeningExecutorService spillExecutor;
+
+ private final int[] numRecordsPerPartition;
+ private volatile long spilledSize = 0;
+
+ /**
+ * Represents final number of records written (spills are not counted)
+ */
+ protected final TezCounter outputLargeRecordsCounter;
+
+ @VisibleForTesting
+ int numBuffers;
+ @VisibleForTesting
+ int sizePerBuffer;
+ @VisibleForTesting
+ int numInitializedBuffers;
+
+ private Throwable spillException;
+ private AtomicBoolean isShutdown = new AtomicBoolean(false);
+ @VisibleForTesting
+ final AtomicInteger numSpills = new AtomicInteger(0);
+ private final AtomicInteger pendingSpillCount = new AtomicInteger(0);
+
+ private final ReentrantLock spillLock = new ReentrantLock();
+ private final Condition spillInProgress = spillLock.newCondition();
+
+ public UnorderedPartitionedKVWriter(TezOutputContext outputContext, Configuration conf,
+ int numOutputs, long availableMemoryBytes) throws IOException {
+ super(outputContext, conf, numOutputs);
+ Preconditions.checkArgument(availableMemoryBytes > 0, "availableMemory should not be > 0 bytes");
+ // Ideally, should be significantly larger.
+ availableMemory = availableMemoryBytes;
+
+ // Allow unit tests to control the buffer sizes.
+ int maxSingleBufferSizeBytes = conf.getInt(
+ TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES, Integer.MAX_VALUE);
+ computeNumBuffersAndSize(maxSingleBufferSizeBytes);
+ LOG.info("Running with numBuffers=" + numBuffers + ", sizePerBuffer=" + sizePerBuffer);
+ availableBuffers = new LinkedBlockingQueue<WrappedBuffer>();
+ buffers = new WrappedBuffer[numBuffers];
+ // Set up only the first buffer to start with.
+ buffers[0] = new WrappedBuffer(numOutputs, sizePerBuffer);
+ numInitializedBuffers = 1;
+ LOG.info("Initialize Buffer #" + numInitializedBuffers + " with size=" + sizePerBuffer);
+ currentBuffer = buffers[0];
+ baos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(baos);
+ keySerializer.open(dos);
+ valSerializer.open(dos);
+ rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw();
+
+ ExecutorService executor = Executors.newFixedThreadPool(
+ 1,
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(
+ "UnorderedOutSpiller ["
+ + TezUtils.cleanVertexName(outputContext.getDestinationVertexName()) + "]")
+ .build());
+ spillExecutor = MoreExecutors.listeningDecorator(executor);
+ numRecordsPerPartition = new int[numPartitions];
+
+ outputLargeRecordsCounter = outputContext.getCounters().findCounter(
+ TaskCounter.OUTPUT_LARGE_RECORDS);
+ }
+
+ private void computeNumBuffersAndSize(int bufferLimit) {
+ numBuffers = Math.max(2, (int) (availableMemory / bufferLimit)
+ + ((availableMemory % bufferLimit) == 0 ? 0 : 1));
+ sizePerBuffer = (int) (availableMemory / numBuffers);
+ sizePerBuffer = sizePerBuffer - (sizePerBuffer % INT_SIZE);
+ }
+
+ @Override
+ public void write(Object key, Object value) throws IOException {
+ // Skipping checks for key-value types. IFile takes care of these, but should be removed from
+ // there as well.
+
+ // How expensive are checks like these ?
+ if (isShutdown.get()) {
+ throw new RuntimeException("Writer already closed");
+ }
+ if (spillException != null) {
+ // Already reported as a fatalError - report to the user code
+ throw new IOException("Exception during spill", new IOException(spillException));
+ }
+ int partition = partitioner.getPartition(key, value, numPartitions);
+ write(key, value, partition);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void write(Object key, Object value, int partition) throws IOException {
+ // Wrap to 4 byte (Int) boundary for metaData
+ int mod = currentBuffer.nextPosition % INT_SIZE;
+ int metaSkip = mod == 0 ? 0 : (INT_SIZE - mod);
+ if (currentBuffer.availableSize < (META_SIZE + metaSkip)) {
+ // Move over to the next buffer.
+ metaSkip = 0;
+ setupNextBuffer();
+ }
+ currentBuffer.nextPosition += metaSkip;
+ int metaStart = currentBuffer.nextPosition;
+ currentBuffer.availableSize -= (META_SIZE + metaSkip);
+ currentBuffer.nextPosition += META_SIZE;
+ try {
+ keySerializer.serialize(key);
+ } catch (BufferTooSmallException e) {
+ if (metaStart == 0) { // Started writing at the start of the buffer. Write Key to disk.
+ // Key too large for any buffer. Write entire record to disk.
+ currentBuffer.reset();
+ writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+ return;
+ } else { // Exceeded length on current buffer.
+ // Try resetting the buffer to the next one, if this was not the start of a buffer,
+ // and begin spilling the current buffer to disk if it has any records.
+ setupNextBuffer();
+ write(key, value, partition);
+ return;
+ }
+ }
+ int valStart = currentBuffer.nextPosition;
+ try {
+ valSerializer.serialize(value);
+ } catch (BufferTooSmallException e) {
+ // Value too large for current buffer, or K-V too large for entire buffer.
+ if (metaStart == 0) {
+ // Key + Value too large for a single buffer.
+ currentBuffer.reset();
+ writeLargeRecord(key, value, partition, numSpills.incrementAndGet());
+ return;
+ } else { // Exceeded length on current buffer.
+ // Try writing key+value to a new buffer - will fall back to disk if that fails.
+ setupNextBuffer();
+ write(key, value, partition);
+ return;
+ }
+ }
+
+ // Meta-data updates
+ int metaIndex = metaStart / INT_SIZE;
+ int indexNext = currentBuffer.partitionPositions[partition];
+
+ currentBuffer.metaBuffer.put(metaIndex + INDEX_KEYLEN, (valStart - (metaStart + META_SIZE)));
+ currentBuffer.metaBuffer.put(metaIndex + INDEX_VALLEN, (currentBuffer.nextPosition - valStart));
+ currentBuffer.metaBuffer.put(metaIndex + INDEX_NEXT, indexNext);
+ currentBuffer.skipSize += metaSkip; // For size estimation
+ // Update stats on number of records
+ outputRecordBytesCounter.increment(currentBuffer.nextPosition - (metaStart + META_SIZE));
+ outputBytesWithOverheadCounter.increment((currentBuffer.nextPosition - metaStart) + metaSkip);
+ outputRecordsCounter.increment(1);
+ currentBuffer.partitionPositions[partition] = metaStart;
+ currentBuffer.recordsPerPartition[partition]++;
+ currentBuffer.numRecords++;
+
+ }
+
+ private void setupNextBuffer() throws IOException {
+
+ if (currentBuffer.numRecords == 0) {
+ currentBuffer.reset();
+ } else {
+ // Update overall stats
+ LOG.info("Moving to next buffer and triggering spill");
+ updateGlobalStats(currentBuffer);
+
+ pendingSpillCount.incrementAndGet();
+
+ ListenableFuture<SpillResult> future = spillExecutor.submit(new SpillCallable(currentBuffer,
+ numSpills.incrementAndGet(), codec, spilledRecordsCounter, false));
+ Futures.addCallback(future, new SpillCallback(numSpills.get()));
+
+ WrappedBuffer wb = getNextAvailableBuffer();
+ currentBuffer = wb;
+ }
+ }
+
+ private void updateGlobalStats(WrappedBuffer buffer) {
+ for (int i = 0; i < numPartitions; i++) {
+ numRecordsPerPartition[i] += buffer.recordsPerPartition[i];
+ }
+ }
+
+ private WrappedBuffer getNextAvailableBuffer() throws IOException {
+ if (availableBuffers.peek() == null) {
+ if (numInitializedBuffers < numBuffers) {
+ buffers[numInitializedBuffers] = new WrappedBuffer(numPartitions, sizePerBuffer);
+ numInitializedBuffers++;
+ return buffers[numInitializedBuffers - 1];
+ } else {
+ // All buffers initialized, and none available right now. Wait
+ try {
+ return availableBuffers.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting for next buffer", e);
+ }
+ }
+ } else {
+ return availableBuffers.poll();
+ }
+ }
+
+ // All spills using compression for now.
+ private class SpillCallable implements Callable<SpillResult> {
+
+ private final WrappedBuffer wrappedBuffer;
+ private final CompressionCodec codec;
+ private final TezCounter numRecordsCounter;
+ private final int spillNumber;
+ private final boolean isFinalSpill;
+
+ public SpillCallable(WrappedBuffer wrappedBuffer, int spillNumber, CompressionCodec codec,
+ TezCounter numRecordsCounter, boolean isFinal) {
+ this.wrappedBuffer = wrappedBuffer;
+ this.codec = codec;
+ this.numRecordsCounter = numRecordsCounter;
+ this.spillNumber = spillNumber;
+ this.isFinalSpill = isFinal;
+ }
+
+ @Override
+ public SpillResult call() throws IOException {
+ // This should not be called with an empty buffer. Check before invoking.
+
+ // Number of parallel spills determined by number of threads.
+ // Last spill synchronization handled separately.
+ SpillResult spillResult = null;
+ long spillSize = wrappedBuffer.nextPosition + numPartitions * APPROX_HEADER_LENGTH;
+ Path outPath = null;
+ if (isFinalSpill) {
+ outPath = outputFileHandler.getOutputFileForWrite(spillSize);
+ } else {
+ outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
+ }
+ FSDataOutputStream out = rfs.create(outPath);
+ TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
+ DataInputBuffer key = new DataInputBuffer();
+ DataInputBuffer val = new DataInputBuffer();
+ for (int i = 0; i < numPartitions; i++) {
+ IFile.Writer writer = null;
+ try {
+ long segmentStart = out.getPos();
+ if (wrappedBuffer.partitionPositions[i] == WrappedBuffer.PARTITION_ABSENT_POSITION) {
+ // Skip empty partition.
+ continue;
+ }
+ writer = new Writer(conf, out, keyClass, valClass, codec, numRecordsCounter, null);
+ writePartition(wrappedBuffer.partitionPositions[i], wrappedBuffer, writer, key, val);
+ writer.close();
+ if (isFinalSpill) {
+ fileOutputBytesCounter.increment(writer.getCompressedLength());
+ } else {
+ additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+ }
+ spillResult = new SpillResult(writer.getCompressedLength(), this.wrappedBuffer);
+ TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRecord.putIndex(indexRecord, i);
+ writer = null;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+ if (isFinalSpill) {
+ long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ Path finalSpillFile = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+ spillRecord.writeToFile(finalSpillFile, conf);
+ fileOutputBytesCounter.increment(indexFileSizeEstimate);
+ LOG.info("Finished final and only spill");
+ } else {
+ SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
+ spillInfoList.add(spillInfo);
+ numAdditionalSpillsCounter.increment(1);
+ LOG.info("Finished spill " + spillNumber);
+ }
+ return spillResult;
+ }
+ }
+
+ private void writePartition(int pos, WrappedBuffer wrappedBuffer, Writer writer,
+ DataInputBuffer keyBuffer, DataInputBuffer valBuffer) throws IOException {
+ while (pos != WrappedBuffer.PARTITION_ABSENT_POSITION) {
+ int metaIndex = pos / INT_SIZE;
+ int keyLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_KEYLEN);
+ int valLength = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_VALLEN);
+ keyBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE, keyLength);
+ valBuffer.reset(wrappedBuffer.buffer, pos + META_SIZE + keyLength, valLength);
+
+ writer.append(keyBuffer, valBuffer);
+ pos = wrappedBuffer.metaBuffer.get(metaIndex + INDEX_NEXT);
+ }
+ }
+
+ public static long getInitialMemoryRequirement(Configuration conf, long maxAvailableTaskMemory) {
+ int initialMemRequestMb = conf.getInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB,
+ TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB_DEFAULT);
+ Preconditions.checkArgument(initialMemRequestMb != 0,
+ TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB + " should be larger than 0");
+ long reqBytes = initialMemRequestMb << 20;
+ LOG.info("Requested BufferSize (" + TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB
+ + ") : " + initialMemRequestMb);
+ return reqBytes;
+ }
+
+ @Override
+ public List<Event> close() throws IOException, InterruptedException {
+ isShutdown.set(true);
+ spillLock.lock();
+ LOG.info("Waiting for all spills to complete : Pending : " + pendingSpillCount.get());
+ try {
+ while (pendingSpillCount.get() != 0 && spillException == null) {
+ spillInProgress.await();
+ }
+ } finally {
+ spillLock.unlock();
+ }
+ if (spillException != null) {
+ LOG.fatal("Error during spill, throwing");
+ // Assuming close will be called on the same thread as the write
+ cleanup();
+ currentBuffer.cleanup();
+ currentBuffer = null;
+ if (spillException instanceof IOException) {
+ throw (IOException) spillException;
+ } else {
+ throw new IOException(spillException);
+ }
+ } else {
+ LOG.info("All spills complete");
+ // Assuming close will be called on the same thread as the write
+ cleanup();
+ if (numSpills.get() > 0) {
+ mergeAll();
+ } else {
+ finalSpill();
+ }
+
+ currentBuffer.cleanup();
+ currentBuffer = null;
+ }
+
+ return Collections.singletonList(generateEvent());
+ }
+
+ private void cleanup() {
+ if (spillExecutor != null) {
+ spillExecutor.shutdownNow();
+ }
+ for (int i = 0; i < buffers.length; i++) {
+ if (buffers[i] != null && buffers[i] != currentBuffer) {
+ buffers[i].cleanup();
+ buffers[i] = null;
+ }
+ }
+ availableBuffers.clear();
+ }
+
+ private Event generateEvent() throws IOException {
+ DataMovementEventPayloadProto.Builder payloadBuidler = DataMovementEventPayloadProto
+ .newBuilder();
+
+ String host = getHost();
+ int shufflePort = getShufflePort();
+
+ BitSet emptyPartitions = new BitSet();
+ for (int i = 0; i < numPartitions; i++) {
+ if (numRecordsPerPartition[i] == 0) {
+ emptyPartitions.set(i);
+ }
+ }
+ if (emptyPartitions.cardinality() != 0) {
+ // Empty partitions exist
+ ByteString emptyPartitionsByteString = TezUtils.compressByteArrayToByteString(TezUtils
+ .toByteArray(emptyPartitions));
+ payloadBuidler.setEmptyPartitions(emptyPartitionsByteString);
+ }
+ if (emptyPartitions.cardinality() != numPartitions) {
+ // Populate payload only if at least 1 partition has data
+ payloadBuidler.setHost(host);
+ payloadBuidler.setPort(shufflePort);
+ payloadBuidler.setPathComponent(outputContext.getUniqueIdentifier());
+ }
+
+ CompositeDataMovementEvent cDme = new CompositeDataMovementEvent(0, numPartitions,
+ payloadBuidler.build().toByteArray());
+ return cDme;
+ }
+
+ private void finalSpill() throws IOException {
+ if (currentBuffer.nextPosition == 0) {
+ return;
+ } else {
+ updateGlobalStats(currentBuffer);
+ SpillCallable spillCallable = new SpillCallable(currentBuffer, 0, codec, null, true);
+ spillCallable.call();
+ return;
+ }
+
+ }
+
+ private void mergeAll() throws IOException {
+ long expectedSize = spilledSize;
+ if (currentBuffer.nextPosition != 0) {
+ expectedSize += currentBuffer.nextPosition - (currentBuffer.numRecords * META_SIZE)
+ - currentBuffer.skipSize + numPartitions * APPROX_HEADER_LENGTH;
+ // Update final statistics.
+ updateGlobalStats(currentBuffer);
+ }
+
+ long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
+ Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+
+ TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
+
+ DataInputBuffer keyBuffer = new DataInputBuffer();
+ DataInputBuffer valBuffer = new DataInputBuffer();
+
+ DataInputBuffer keyBufferIFile = new DataInputBuffer();
+ DataInputBuffer valBufferIFile = new DataInputBuffer();
+
+ FSDataOutputStream out = null;
+ try {
+ out = rfs.create(finalOutPath);
+ Writer writer = null;
+
+ for (int i = 0; i < numPartitions; i++) {
+ long segmentStart = out.getPos();
+ if (numRecordsPerPartition[i] == 0) {
+ LOG.info("Skipping partition: " + i + " in final merge since it has no records");
+ continue;
+ }
+ writer = new Writer(conf, out, keyClass, valClass, codec, null, null);
+ try {
+ if (currentBuffer.nextPosition != 0
+ && currentBuffer.partitionPositions[i] != WrappedBuffer.PARTITION_ABSENT_POSITION) {
+ // Write current buffer.
+ writePartition(currentBuffer.partitionPositions[i], currentBuffer, writer, keyBuffer,
+ valBuffer);
+ }
+ synchronized (spillInfoList) {
+ for (SpillInfo spillInfo : spillInfoList) {
+ TezIndexRecord indexRecord = spillInfo.spillRecord.getIndex(i);
+ if (indexRecord.getPartLength() == 0) {
+ // Skip empty partitions within a spill
+ continue;
+ }
+ FSDataInputStream in = rfs.open(spillInfo.outPath);
+ in.seek(indexRecord.getStartOffset());
+ IFile.Reader reader = new IFile.Reader(in, indexRecord.getPartLength(), codec, null,
+ additionalSpillBytesReadCounter, ifileReadAhead, ifileReadAheadLength,
+ ifileBufferSize);
+ while (reader.nextRawKey(keyBufferIFile)) {
+ // TODO Inefficient. If spills are not compressed, a direct copy should be possible
+ // given the current IFile format. Also exteremely inefficient for large records,
+ // since the entire record will be read into memory.
+ reader.nextRawValue(valBufferIFile);
+ writer.append(keyBufferIFile, valBufferIFile);
+ }
+ reader.close();
+ }
+ }
+ writer.close();
+ fileOutputBytesCounter.increment(writer.getCompressedLength());
+ TezIndexRecord indexRecord = new TezIndexRecord(segmentStart, writer.getRawLength(),
+ writer.getCompressedLength());
+ writer = null;
+ finalSpillRecord.putIndex(indexRecord, i);
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ finalSpillRecord.writeToFile(finalIndexPath, conf);
+ fileOutputBytesCounter.increment(indexFileSizeEstimate);
+ LOG.info("Finished final spill after merging : " + numSpills.get() + " spills");
+ }
+
+ private void writeLargeRecord(final Object key, final Object value, final int partition,
+ final int spillNumber) throws IOException {
+ numAdditionalSpillsCounter.increment(1);
+ long size = sizePerBuffer - (currentBuffer.numRecords * META_SIZE) - currentBuffer.skipSize
+ + numPartitions * APPROX_HEADER_LENGTH;
+ FSDataOutputStream out = null;
+ long outSize = 0;
+ try {
+ final TezSpillRecord spillRecord = new TezSpillRecord(numPartitions);
+ final Path outPath = outputFileHandler.getSpillFileForWrite(spillNumber, size);
+ out = rfs.create(outPath);
+ for (int i = 0; i < numPartitions; i++) {
+ final long recordStart = out.getPos();
+ if (i == partition) {
+ spilledRecordsCounter.increment(1);
+ Writer writer = null;
+ try {
+ writer = new IFile.Writer(conf, out, keyClass, valClass, codec, null, null);
+ writer.append(key, value);
+ outputLargeRecordsCounter.increment(1);
+ numRecordsPerPartition[i]++;
+ writer.close();
+ additionalSpillBytesWritternCounter.increment(writer.getCompressedLength());
+ TezIndexRecord indexRecord = new TezIndexRecord(recordStart, writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRecord.putIndex(indexRecord, i);
+ outSize = writer.getCompressedLength();
+ writer = null;
+ } finally {
+ if (writer != null) {
+ writer.close();
+ }
+ }
+ }
+ }
+ SpillInfo spillInfo = new SpillInfo(spillRecord, outPath);
+ spillInfoList.add(spillInfo);
+ LOG.info("Finished writing large record of size " + outSize + " to spill file " + spillNumber);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private class ByteArrayOutputStream extends OutputStream {
+
+ private final byte[] scratch = new byte[1];
+
+ @Override
+ public void write(int v) throws IOException {
+ scratch[0] = (byte) v;
+ write(scratch, 0, 1);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (len > currentBuffer.availableSize) {
+ throw new BufferTooSmallException();
+ } else {
+ System.arraycopy(b, off, currentBuffer.buffer, currentBuffer.nextPosition, len);
+ currentBuffer.nextPosition += len;
+ currentBuffer.availableSize -= len;
+ }
+ }
+ }
+
+ private static class WrappedBuffer {
+
+ private static final int PARTITION_ABSENT_POSITION = -1;
+
+ private final int[] partitionPositions;
+ private final int[] recordsPerPartition;
+ private final int numPartitions;
+ private final int size;
+
+ private byte[] buffer;
+ private IntBuffer metaBuffer;
+
+ private int numRecords = 0;
+ private int skipSize = 0;
+
+ private int nextPosition = 0;
+ private int availableSize;
+
+ WrappedBuffer(int numPartitions, int size) {
+ this.partitionPositions = new int[numPartitions];
+ this.recordsPerPartition = new int[numPartitions];
+ this.numPartitions = numPartitions;
+ for (int i = 0; i < numPartitions; i++) {
+ this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
+ this.recordsPerPartition[i] = 0;
+ }
+ size = size - (size % INT_SIZE);
+ this.size = size;
+ this.buffer = new byte[size];
+ this.metaBuffer = ByteBuffer.wrap(buffer).order(ByteOrder.nativeOrder()).asIntBuffer();
+ availableSize = size;
+ }
+
+ void reset() {
+ for (int i = 0; i < numPartitions; i++) {
+ this.partitionPositions[i] = PARTITION_ABSENT_POSITION;
+ this.recordsPerPartition[i] = 0;
+ }
+ numRecords = 0;
+ nextPosition = 0;
+ skipSize = 0;
+ availableSize = size;
+ }
+
+ void cleanup() {
+ buffer = null;
+ metaBuffer = null;
+ }
+ }
+
+ private static class BufferTooSmallException extends IOException {
+ private static final long serialVersionUID = 1L;
+ }
+
+ private class SpillCallback implements FutureCallback<SpillResult> {
+
+ private final int spillNumber;
+
+ SpillCallback(int spillNumber) {
+ this.spillNumber = spillNumber;
+ }
+
+ @Override
+ public void onSuccess(SpillResult result) {
+ LOG.info("Spill# " + spillNumber + " complete.");
+ spilledSize += result.spillSize;
+ try {
+ result.wrappedBuffer.reset();
+ availableBuffers.add(result.wrappedBuffer);
+
+ } catch (Throwable e) {
+ LOG.fatal("Failure while attempting to reset buffer after spill", e);
+ outputContext.fatalError(e, "Failure while attempting to reset buffer after spill");
+ }
+
+ spillLock.lock();
+ try {
+ if (pendingSpillCount.decrementAndGet() == 0) {
+ spillInProgress.signal();
+ }
+ } finally {
+ spillLock.unlock();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // spillException setup to throw an exception back to the user. Requires synchronization.
+ // Consider removing it in favor of having Tez kill the task
+ LOG.fatal("Failure while spilling to disk", t);
+ spillException = t;
+ outputContext.fatalError(t, "Failure while spilling to disk");
+ spillLock.lock();
+ try {
+ spillInProgress.signal();
+ } finally {
+ spillLock.unlock();
+ }
+ }
+ }
+
+ private static class SpillResult {
+ final long spillSize;
+ final WrappedBuffer wrappedBuffer;
+
+ SpillResult(long size, WrappedBuffer wrappedBuffer) {
+ this.spillSize = size;
+ this.wrappedBuffer = wrappedBuffer;
+ }
+ }
+
+ private static class SpillInfo {
+ final TezSpillRecord spillRecord;
+ final Path outPath;
+
+ SpillInfo(TezSpillRecord spillRecord, Path outPath) {
+ this.spillRecord = spillRecord;
+ this.outPath = outPath;
+ }
+ }
+
+ @VisibleForTesting
+ String getHost() {
+ return System.getenv(ApplicationConstants.Environment.NM_HOST.toString());
+ }
+
+ @VisibleForTesting
+ int getShufflePort() throws IOException {
+ ByteBuffer shuffleMetadata = outputContext
+ .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+ int shufflePort = ShuffleUtils.deserializeShuffleProviderMetaData(shuffleMetadata);
+ return shufflePort;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
index ff2af32..b800039 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java
@@ -18,18 +18,88 @@
package org.apache.tez.runtime.library.output;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.Writer;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
+
+import com.google.common.base.Preconditions;
/**
- * <code>OnFileUnorderedPartitionedKVOutput</code> is a {@link LogicalOutput}
- * which can be used to write Key-Value pairs. The key-value pairs are written
- * to the correct partition based on the configured Partitioner.
+ * <code>OnFileUnorderedPartitionedKVOutput</code> is a {@link LogicalOutput} which can be used to
+ * write Key-Value pairs. The key-value pairs are written to the correct partition based on the
+ * configured Partitioner.
*
* This currently acts as a usable placeholder for writing unordered output (the data is sorted,
* which should be functionally correct since there's no guarantees on order without a sort).
* TEZ-661 to add a proper implementation.
*
*/
-public class OnFileUnorderedPartitionedKVOutput extends OnFileSortedOutput {
+public class OnFileUnorderedPartitionedKVOutput implements LogicalOutput {
+
+ private static final Log LOG = LogFactory.getLog(OnFileUnorderedPartitionedKVOutput.class);
+
+ private TezOutputContext outputContext;
+ private Configuration conf;
+ private int numPhysicalOutputs;
+ private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler;
+ private UnorderedPartitionedKVWriter kvWriter;
+ private final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ @Override
+ public synchronized List<Event> initialize(TezOutputContext outputContext) throws Exception {
+ this.outputContext = outputContext;
+ this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+ this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+ this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler();
+ outputContext.requestInitialMemory(
+ UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf,
+ outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler);
+ return Collections.emptyList();
+ }
+
+ @Override
+ public synchronized void start() throws Exception {
+ if (!isStarted.get()) {
+ memoryUpdateCallbackHandler.validateUpdateReceived();
+ this.kvWriter = new UnorderedPartitionedKVWriter(outputContext, conf, numPhysicalOutputs,
+ memoryUpdateCallbackHandler.getMemoryAssigned());
+ isStarted.set(true);
+ }
+ }
+
+ @Override
+ public synchronized Writer getWriter() throws Exception {
+ Preconditions.checkState(isStarted.get(), "Cannot get writer before starting the Output");
+ return kvWriter;
+ }
+
+ @Override
+ public void handleEvents(List<Event> outputEvents) {
+ }
+
+ @Override
+ public synchronized List<Event> close() throws Exception {
+ if (isStarted.get()) {
+ return kvWriter.close();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+ @Override
+ public synchronized void setNumPhysicalOutputs(int numOutputs) {
+ this.numPhysicalOutputs = numOutputs;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
new file mode 100644
index 0000000..d95a654
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/partitioner/HashPartitioner.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.partitioner;
+
+import org.apache.tez.runtime.library.api.Partitioner;
+
+public class HashPartitioner implements Partitioner {
+
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index c295601..f7dfe0a 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -35,6 +35,7 @@ import org.apache.tez.runtime.library.input.ShuffledMergedInput;
import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -185,6 +186,8 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator
requestType = RequestType.SORTED_MERGED_INPUT;
} else if (className.equals(ShuffledUnorderedKVInput.class.getName())) {
requestType = RequestType.UNSORTED_INPUT;
+ } else if (className.equals(OnFileUnorderedPartitionedKVOutput.class.getName())) {
+ requestType = RequestType.PARTITIONED_UNSORTED_OUTPUT;
} else {
requestType = RequestType.OTHER;
LOG.info("Falling back to RequestType.OTHER for class: " + className);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/26ad7037/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
new file mode 100644
index 0000000..8b51765
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -0,0 +1,673 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.writers;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.api.Partitioner;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
+import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
+import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+@RunWith(value = Parameterized.class)
+public class TestUnorderedPartitionedKVWriter {
+
+ private static final Log LOG = LogFactory.getLog(TestUnorderedPartitionedKVWriter.class);
+
+ private static final String HOST_STRING = "localhost";
+ private static final int SHUFFLE_PORT = 4000;
+
+ private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
+ private static final Path TEST_ROOT_DIR = new Path(testTmpDir,
+ TestUnorderedPartitionedKVWriter.class.getSimpleName());
+ private static FileSystem localFs;
+
+ private boolean shouldCompress;
+
+ public TestUnorderedPartitionedKVWriter(boolean shouldCompress) {
+ this.shouldCompress = shouldCompress;
+ }
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ Object[][] data = new Object[][] { { false }, { true } };
+ return Arrays.asList(data);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
+ localFs = FileSystem.getLocal(new Configuration());
+ localFs.delete(TEST_ROOT_DIR, true);
+ localFs.mkdirs(TEST_ROOT_DIR);
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ LOG.info("CleanUp");
+ localFs.delete(TEST_ROOT_DIR, true);
+ }
+
+ @Test(timeout = 10000)
+ public void testBufferSizing() throws IOException {
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezCounters counters = new TezCounters();
+ String uniqueId = UUID.randomUUID().toString();
+ TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+ int maxSingleBufferSizeBytes = 2047;
+ Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
+ false, maxSingleBufferSizeBytes);
+
+ int numOutputs = 10;
+
+ UnorderedPartitionedKVWriter kvWriter = null;
+
+ kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048);
+ assertEquals(2, kvWriter.numBuffers);
+ assertEquals(1024, kvWriter.sizePerBuffer);
+ assertEquals(1, kvWriter.numInitializedBuffers);
+
+ kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+ maxSingleBufferSizeBytes * 3);
+ assertEquals(3, kvWriter.numBuffers);
+ assertEquals(maxSingleBufferSizeBytes - maxSingleBufferSizeBytes % 4, kvWriter.sizePerBuffer);
+ assertEquals(1, kvWriter.numInitializedBuffers);
+
+ kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs,
+ maxSingleBufferSizeBytes * 2 + 1);
+ assertEquals(3, kvWriter.numBuffers);
+ assertEquals(1364, kvWriter.sizePerBuffer);
+ assertEquals(1, kvWriter.numInitializedBuffers);
+
+ kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 10240);
+ assertEquals(6, kvWriter.numBuffers);
+ assertEquals(1704, kvWriter.sizePerBuffer);
+ assertEquals(1, kvWriter.numInitializedBuffers);
+ }
+
+ @Test(timeout = 10000)
+ public void testNoSpill() throws IOException, InterruptedException {
+ baseTest(10, 10, null, shouldCompress);
+ }
+
+ @Test(timeout = 10000)
+ public void testSingleSpill() throws IOException, InterruptedException {
+ baseTest(50, 10, null, shouldCompress);
+ }
+
+ @Test(timeout = 10000)
+ public void testMultipleSpills() throws IOException, InterruptedException {
+ baseTest(200, 10, null, shouldCompress);
+ }
+
+ @Test(timeout = 10000)
+ public void testNoRecords() throws IOException, InterruptedException {
+ baseTest(0, 10, null, shouldCompress);
+ }
+
+ @Test(timeout = 10000)
+ public void testSkippedPartitions() throws IOException, InterruptedException {
+ baseTest(200, 10, Sets.newHashSet(2, 5), shouldCompress);
+ }
+
+ @Test(timeout = 10000)
+ public void testRandomText() throws IOException, InterruptedException {
+ textTest(100, 10, 2048, 0, 0, 0);
+ }
+
+ @Test(timeout = 10000)
+ public void testLargeKeys() throws IOException, InterruptedException {
+ textTest(0, 10, 2048, 10, 0, 0);
+ }
+
+ @Test(timeout = 10000)
+ public void testLargevalues() throws IOException, InterruptedException {
+ textTest(0, 10, 2048, 0, 10, 0);
+ }
+
+ @Test(timeout = 10000)
+ public void testLargeKvPairs() throws IOException, InterruptedException {
+ textTest(0, 10, 2048, 0, 0, 10);
+ }
+
+ @Test(timeout = 10000)
+ public void testTextMixedRecords() throws IOException, InterruptedException {
+ textTest(100, 10, 2048, 10, 10, 10);
+ }
+
+ public void textTest(int numRegularRecords, int numPartitions, long availableMemory,
+ int numLargeKeys, int numLargevalues, int numLargeKvPairs) throws IOException,
+ InterruptedException {
+ Partitioner partitioner = new HashPartitioner();
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezCounters counters = new TezCounters();
+ String uniqueId = UUID.randomUUID().toString();
+ TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+ Random random = new Random();
+
+ Configuration conf = createConfiguration(outputContext, Text.class, Text.class, shouldCompress,
+ -1, HashPartitioner.class);
+ CompressionCodec codec = null;
+ if (shouldCompress) {
+ codec = new DefaultCodec();
+ ((Configurable) codec).setConf(conf);
+ }
+
+ int numRecordsWritten = 0;
+
+ Map<Integer, Multimap<String, String>> expectedValues = new HashMap<Integer, Multimap<String, String>>();
+ for (int i = 0; i < numPartitions; i++) {
+ expectedValues.put(i, LinkedListMultimap.<String, String> create());
+ }
+
+ UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
+ conf, numPartitions, availableMemory);
+
+ int sizePerBuffer = kvWriter.sizePerBuffer;
+
+ BitSet partitionsWithData = new BitSet(numPartitions);
+ Text keyText = new Text();
+ Text valText = new Text();
+ for (int i = 0; i < numRegularRecords; i++) {
+ String key = createRandomString(Math.abs(random.nextInt(10)));
+ String val = createRandomString(Math.abs(random.nextInt(20)));
+ keyText.set(key);
+ valText.set(val);
+ int partition = partitioner.getPartition(keyText, valText, numPartitions);
+ partitionsWithData.set(partition);
+ expectedValues.get(partition).put(key, val);
+ kvWriter.write(keyText, valText);
+ numRecordsWritten++;
+ }
+
+ // Write Large key records
+ for (int i = 0; i < numLargeKeys; i++) {
+ String key = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
+ String val = createRandomString(Math.abs(random.nextInt(20)));
+ keyText.set(key);
+ valText.set(val);
+ int partition = partitioner.getPartition(keyText, valText, numPartitions);
+ partitionsWithData.set(partition);
+ expectedValues.get(partition).put(key, val);
+ kvWriter.write(keyText, valText);
+ numRecordsWritten++;
+ }
+
+ // Write Large val records
+ for (int i = 0; i < numLargevalues; i++) {
+ String key = createRandomString(Math.abs(random.nextInt(10)));
+ String val = createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
+ keyText.set(key);
+ valText.set(val);
+ int partition = partitioner.getPartition(keyText, valText, numPartitions);
+ partitionsWithData.set(partition);
+ expectedValues.get(partition).put(key, val);
+ kvWriter.write(keyText, valText);
+ numRecordsWritten++;
+ }
+
+ // Write records where key + val are large (but both can fit in the buffer individually)
+ for (int i = 0; i < numLargeKvPairs; i++) {
+ String key = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
+ String val = createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
+ keyText.set(key);
+ valText.set(val);
+ int partition = partitioner.getPartition(keyText, valText, numPartitions);
+ partitionsWithData.set(partition);
+ expectedValues.get(partition).put(key, val);
+ kvWriter.write(keyText, valText);
+ numRecordsWritten++;
+ }
+
+ List<Event> events = kvWriter.close();
+ verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
+
+ TezCounter outputLargeRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS);
+ assertEquals(numLargeKeys + numLargevalues + numLargeKvPairs,
+ outputLargeRecordsCounter.getValue());
+
+ // Validate the event
+ assertEquals(1, events.size());
+ assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+ assertEquals(0, cdme.getSourceIndexStart());
+ assertEquals(numPartitions, cdme.getSourceIndexEnd());
+ DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
+ .getUserPayload());
+ assertFalse(eventProto.hasData());
+ BitSet emptyPartitionBits = null;
+ if (partitionsWithData.cardinality() != numPartitions) {
+ assertTrue(eventProto.hasEmptyPartitions());
+ byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+ .getEmptyPartitions());
+ emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
+ assertEquals(numPartitions - partitionsWithData.cardinality(),
+ emptyPartitionBits.cardinality());
+ } else {
+ assertFalse(eventProto.hasEmptyPartitions());
+ emptyPartitionBits = new BitSet(numPartitions);
+ }
+ assertEquals(HOST_STRING, eventProto.getHost());
+ assertEquals(SHUFFLE_PORT, eventProto.getPort());
+ assertEquals(uniqueId, eventProto.getPathComponent());
+
+ // Verify the data
+ // Verify the actual data
+ TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+ Path outputFilePath = null;
+ Path spillFilePath = null;
+ try {
+ outputFilePath = taskOutput.getOutputFile();
+ } catch (DiskErrorException e) {
+ if (numRecordsWritten > 0) {
+ fail();
+ } else {
+ // Record checking not required.
+ return;
+ }
+ }
+ try {
+ spillFilePath = taskOutput.getOutputIndexFile();
+ } catch (DiskErrorException e) {
+ if (numRecordsWritten > 0) {
+ fail();
+ } else {
+ // Record checking not required.
+ return;
+ }
+ }
+
+ // Special case for 0 records.
+ TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+ DataInputBuffer keyBuffer = new DataInputBuffer();
+ DataInputBuffer valBuffer = new DataInputBuffer();
+ Text keyDeser = new Text();
+ Text valDeser = new Text();
+ for (int i = 0; i < numPartitions; i++) {
+ if (emptyPartitionBits.get(i)) {
+ continue;
+ }
+ TezIndexRecord indexRecord = spillRecord.getIndex(i);
+ FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
+ inStream.seek(indexRecord.getStartOffset());
+ IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
+ null, false, 0, -1);
+ while (reader.nextRawKey(keyBuffer)) {
+ reader.nextRawValue(valBuffer);
+ keyDeser.readFields(keyBuffer);
+ valDeser.readFields(valBuffer);
+ int partition = partitioner.getPartition(keyDeser, valDeser, numPartitions);
+ assertTrue(expectedValues.get(partition).remove(keyDeser.toString(), valDeser.toString()));
+ }
+ inStream.close();
+ }
+ for (int i = 0; i < numPartitions; i++) {
+ assertEquals(0, expectedValues.get(i).size());
+ expectedValues.remove(i);
+ }
+ assertEquals(0, expectedValues.size());
+ }
+
+ private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions,
+ boolean shouldCompress) throws IOException, InterruptedException {
+ PartitionerForTest partitioner = new PartitionerForTest();
+ ApplicationId appId = ApplicationId.newInstance(10000, 1);
+ TezCounters counters = new TezCounters();
+ String uniqueId = UUID.randomUUID().toString();
+ TezOutputContext outputContext = createMockOutputContext(counters, appId, uniqueId);
+
+ Configuration conf = createConfiguration(outputContext, IntWritable.class, LongWritable.class,
+ shouldCompress, -1);
+ CompressionCodec codec = null;
+ if (shouldCompress) {
+ codec = new DefaultCodec();
+ ((Configurable) codec).setConf(conf);
+ }
+
+ int numOutputs = numPartitions;
+ long availableMemory = 2048;
+ int numRecordsWritten = 0;
+
+ Map<Integer, Multimap<Integer, Long>> expectedValues = new HashMap<Integer, Multimap<Integer, Long>>();
+ for (int i = 0; i < numOutputs; i++) {
+ expectedValues.put(i, LinkedListMultimap.<Integer, Long> create());
+ }
+
+ UnorderedPartitionedKVWriter kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext,
+ conf, numOutputs, availableMemory);
+
+ int sizePerBuffer = kvWriter.sizePerBuffer;
+ int sizePerRecord = 4 + 8; // IntW + LongW
+ int sizePerRecordWithOverhead = sizePerRecord + 12; // Record + META_OVERHEAD
+
+ IntWritable intWritable = new IntWritable();
+ LongWritable longWritable = new LongWritable();
+ for (int i = 0; i < numRecords; i++) {
+ intWritable.set(i);
+ longWritable.set(i);
+ int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
+ if (skippedPartitions != null && skippedPartitions.contains(partition)) {
+ continue;
+ }
+ expectedValues.get(partition).put(intWritable.get(), longWritable.get());
+ kvWriter.write(intWritable, longWritable);
+ numRecordsWritten++;
+ }
+ List<Event> events = kvWriter.close();
+
+ int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
+ int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
+
+ verify(outputContext, never()).fatalError(any(Throwable.class), any(String.class));
+
+ // Verify the status of the buffers
+ if (numExpectedSpills == 0) {
+ assertEquals(1, kvWriter.numInitializedBuffers);
+ } else {
+ assertTrue(kvWriter.numInitializedBuffers > 1);
+ }
+ assertNull(kvWriter.currentBuffer);
+ assertEquals(0, kvWriter.availableBuffers.size());
+
+ // Verify the counters
+ TezCounter outputRecordBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES);
+ TezCounter outputRecordsCounter = counters.findCounter(TaskCounter.OUTPUT_RECORDS);
+ TezCounter outputBytesWithOverheadCounter = counters
+ .findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
+ TezCounter fileOutputBytesCounter = counters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
+ TezCounter spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS);
+ TezCounter additionalSpillBytesWritternCounter = counters
+ .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
+ TezCounter additionalSpillBytesReadCounter = counters
+ .findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+ TezCounter numAdditionalSpillsCounter = counters
+ .findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
+ assertEquals(numRecordsWritten * sizePerRecord, outputRecordBytesCounter.getValue());
+ assertEquals(numRecordsWritten, outputRecordsCounter.getValue());
+ assertEquals(numRecordsWritten * sizePerRecordWithOverhead,
+ outputBytesWithOverheadCounter.getValue());
+ long fileOutputBytes = fileOutputBytesCounter.getValue();
+ if (numRecordsWritten > 0) {
+ assertTrue(fileOutputBytes > 0);
+ if (!shouldCompress) {
+ assertTrue(fileOutputBytes > outputRecordBytesCounter.getValue());
+ }
+ } else {
+ assertEquals(0, fileOutputBytes);
+ }
+ assertEquals(recordsPerBuffer * numExpectedSpills, spilledRecordsCounter.getValue());
+ long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
+ long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
+ if (numExpectedSpills == 0) {
+ assertEquals(0, additionalSpillBytesWritten);
+ assertEquals(0, additionalSpillBytesRead);
+ } else {
+ assertTrue(additionalSpillBytesWritten > 0);
+ assertTrue(additionalSpillBytesRead > 0);
+ if (!shouldCompress) {
+ assertTrue(additionalSpillBytesWritten > (recordsPerBuffer * numExpectedSpills * sizePerRecord));
+ assertTrue(additionalSpillBytesRead > (recordsPerBuffer * numExpectedSpills * sizePerRecord));
+ }
+ }
+ assertTrue(additionalSpillBytesWritten == additionalSpillBytesRead);
+ assertEquals(numExpectedSpills, numAdditionalSpillsCounter.getValue());
+
+ BitSet emptyPartitionBits = null;
+ // Verify the event returned
+ assertEquals(1, events.size());
+ assertTrue(events.get(0) instanceof CompositeDataMovementEvent);
+ CompositeDataMovementEvent cdme = (CompositeDataMovementEvent) events.get(0);
+ assertEquals(0, cdme.getSourceIndexStart());
+ assertEquals(numOutputs, cdme.getSourceIndexEnd());
+ DataMovementEventPayloadProto eventProto = DataMovementEventPayloadProto.parseFrom(cdme
+ .getUserPayload());
+ assertFalse(eventProto.hasData());
+ if (skippedPartitions == null && numRecordsWritten > 0) {
+ assertFalse(eventProto.hasEmptyPartitions());
+ emptyPartitionBits = new BitSet(numPartitions);
+ } else {
+ assertTrue(eventProto.hasEmptyPartitions());
+ byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray(eventProto
+ .getEmptyPartitions());
+ emptyPartitionBits = TezUtils.fromByteArray(emptyPartitions);
+ if (numRecordsWritten == 0) {
+ assertEquals(numPartitions, emptyPartitionBits.cardinality());
+ } else {
+ for (Integer e : skippedPartitions) {
+ assertTrue(emptyPartitionBits.get(e));
+ }
+ assertEquals(skippedPartitions.size(), emptyPartitionBits.cardinality());
+ }
+ }
+ if (emptyPartitionBits.cardinality() != numPartitions) {
+ assertEquals(HOST_STRING, eventProto.getHost());
+ assertEquals(SHUFFLE_PORT, eventProto.getPort());
+ assertEquals(uniqueId, eventProto.getPathComponent());
+ } else {
+ assertFalse(eventProto.hasHost());
+ assertFalse(eventProto.hasPort());
+ assertFalse(eventProto.hasPathComponent());
+ }
+
+ // Verify the actual data
+ TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
+ Path outputFilePath = null;
+ Path spillFilePath = null;
+ try {
+ outputFilePath = taskOutput.getOutputFile();
+ } catch (DiskErrorException e) {
+ if (numRecordsWritten > 0) {
+ fail();
+ } else {
+ // Record checking not required.
+ return;
+ }
+ }
+ try {
+ spillFilePath = taskOutput.getOutputIndexFile();
+ } catch (DiskErrorException e) {
+ if (numRecordsWritten > 0) {
+ fail();
+ } else {
+ // Record checking not required.
+ return;
+ }
+ }
+
+ // Special case for 0 records.
+ TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
+ DataInputBuffer keyBuffer = new DataInputBuffer();
+ DataInputBuffer valBuffer = new DataInputBuffer();
+ IntWritable keyDeser = new IntWritable();
+ LongWritable valDeser = new LongWritable();
+ for (int i = 0; i < numOutputs; i++) {
+ if (skippedPartitions != null && skippedPartitions.contains(i)) {
+ continue;
+ }
+ TezIndexRecord indexRecord = spillRecord.getIndex(i);
+ FSDataInputStream inStream = FileSystem.getLocal(conf).open(outputFilePath);
+ inStream.seek(indexRecord.getStartOffset());
+ IFile.Reader reader = new IFile.Reader(inStream, indexRecord.getPartLength(), codec, null,
+ null, false, 0, -1);
+ while (reader.nextRawKey(keyBuffer)) {
+ reader.nextRawValue(valBuffer);
+ keyDeser.readFields(keyBuffer);
+ valDeser.readFields(valBuffer);
+ int partition = partitioner.getPartition(keyDeser, valDeser, numOutputs);
+ assertTrue(expectedValues.get(partition).remove(keyDeser.get(), valDeser.get()));
+ }
+ inStream.close();
+ }
+ for (int i = 0; i < numOutputs; i++) {
+ assertEquals(0, expectedValues.get(i).size());
+ expectedValues.remove(i);
+ }
+ assertEquals(0, expectedValues.size());
+ }
+
+ private static String createRandomString(int size) {
+ StringBuilder sb = new StringBuilder();
+ Random random = new Random();
+ for (int i = 0; i < size; i++) {
+ int r = Math.abs(random.nextInt()) % 26;
+ sb.append((char) (65 + r));
+ }
+ return sb.toString();
+ }
+
+ private TezOutputContext createMockOutputContext(TezCounters counters, ApplicationId appId,
+ String uniqueId) {
+ TezOutputContext outputContext = mock(TezOutputContext.class);
+ doReturn(counters).when(outputContext).getCounters();
+ doReturn(appId).when(outputContext).getApplicationId();
+ doReturn(1).when(outputContext).getDAGAttemptNumber();
+ doReturn("dagName").when(outputContext).getDAGName();
+ doReturn("destinationVertexName").when(outputContext).getDestinationVertexName();
+ doReturn(1).when(outputContext).getOutputIndex();
+ doReturn(1).when(outputContext).getTaskAttemptNumber();
+ doReturn(1).when(outputContext).getTaskIndex();
+ doReturn(1).when(outputContext).getTaskVertexIndex();
+ doReturn("vertexName").when(outputContext).getTaskVertexName();
+ doReturn(uniqueId).when(outputContext).getUniqueIdentifier();
+ ByteBuffer portBuffer = ByteBuffer.allocate(4);
+ portBuffer.mark();
+ portBuffer.putInt(SHUFFLE_PORT);
+ portBuffer.reset();
+ doReturn(portBuffer).when(outputContext).getServiceProviderMetaData(
+ eq(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
+ Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
+ String[] outDirs = new String[] { outDirBase.toString() };
+ doReturn(outDirs).when(outputContext).getWorkDirs();
+ return outputContext;
+ }
+
+ private Configuration createConfiguration(TezOutputContext outputContext,
+ Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
+ boolean shouldCompress, int maxSingleBufferSizeBytes) {
+ return createConfiguration(outputContext, keyClass, valClass, shouldCompress,
+ maxSingleBufferSizeBytes, PartitionerForTest.class);
+ }
+
+ private Configuration createConfiguration(TezOutputContext outputContext,
+ Class<? extends Writable> keyClass, Class<? extends Writable> valClass,
+ boolean shouldCompress, int maxSingleBufferSizeBytes,
+ Class<? extends Partitioner> partitionerClass) {
+ Configuration conf = new Configuration(false);
+ conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, valClass.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, partitionerClass.getName());
+ if (maxSingleBufferSizeBytes >= 0) {
+ conf.setInt(TezJobConfig.TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES,
+ maxSingleBufferSizeBytes);
+ }
+ conf.setBoolean(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS, shouldCompress);
+ if (shouldCompress) {
+ conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC,
+ DefaultCodec.class.getName());
+ }
+ return conf;
+ }
+
+ public static class PartitionerForTest implements Partitioner {
+ @Override
+ public int getPartition(Object key, Object value, int numPartitions) {
+ if (key instanceof IntWritable) {
+ return ((IntWritable) key).get() % numPartitions;
+ } else {
+ throw new UnsupportedOperationException(
+ "Test partitioner expected to be called with IntWritable only");
+ }
+ }
+ }
+
+ private static class UnorderedPartitionedKVWriterForTest extends UnorderedPartitionedKVWriter {
+
+ public UnorderedPartitionedKVWriterForTest(TezOutputContext outputContext, Configuration conf,
+ int numOutputs, long availableMemoryBytes) throws IOException {
+ super(outputContext, conf, numOutputs, availableMemoryBytes);
+ }
+
+ @Override
+ String getHost() {
+ return HOST_STRING;
+ }
+ }
+}