You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/25 00:44:12 UTC
[03/20] Rename tez-engine-api to tez-runtime-api and tez-engine is
split into 2: - tez-engine-library for user-visible Input/Output/Processor
implementations - tez-engine-internals for framework internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
new file mode 100644
index 0000000..bb4b4a2
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -0,0 +1,798 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+
+/**
+ * Merger is an utility class used by the Map and Reduce tasks for merging
+ * both their memory and disk segments
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TezMerger {
+ private static final Log LOG = LogFactory.getLog(TezMerger.class);
+
+
+ // Local directories
+ private static LocalDirAllocator lDirAlloc =
+ new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ reporter, null).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator,
+ Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ TezCounter mergedMapOutputsCounter,
+ Progress mergePhase)
+ throws IOException {
+ return
+ new MergeQueue(conf, fs, inputs, deleteInputs, codec, comparator,
+ reporter, mergedMapOutputsCounter).merge(
+ keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
+ comparator, reporter, false, readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ List<Segment> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+ public static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ List<Segment> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+ }
+
+
+ static <K extends Object, V extends Object>
+ TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class keyClass, Class valueClass,
+ CompressionCodec codec,
+ List<Segment> segments,
+ int mergeFactor, int inMemSegments, Path tmpDir,
+ RawComparator comparator, Progressable reporter,
+ boolean sortSegments,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return new MergeQueue(conf, fs, segments, comparator, reporter,
+ sortSegments, codec).merge(keyClass, valueClass,
+ mergeFactor, inMemSegments,
+ tmpDir,
+ readsCounter, writesCounter,
+ mergePhase);
+}
+
+ public static <K extends Object, V extends Object>
+ void writeFile(TezRawKeyValueIterator records, Writer writer,
+ Progressable progressable, Configuration conf)
+ throws IOException {
+ long progressBar =
+ conf.getLong(TezJobConfig.RECORDS_BEFORE_PROGRESS,
+ TezJobConfig.DEFAULT_RECORDS_BEFORE_PROGRESS);
+ long recordCtr = 0;
+ while(records.next()) {
+ writer.append(records.getKey(), records.getValue());
+
+ if (((recordCtr++) % progressBar) == 0) {
+ progressable.progress();
+ }
+ }
+}
+
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static class Segment<K extends Object, V extends Object> {
+ Reader reader = null;
+ final DataInputBuffer key = new DataInputBuffer();
+
+ Configuration conf = null;
+ FileSystem fs = null;
+ Path file = null;
+ boolean preserve = false;
+ CompressionCodec codec = null;
+ long segmentOffset = 0;
+ long segmentLength = -1;
+
+ TezCounter mapOutputsCounter = null;
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve)
+ throws IOException {
+ this(conf, fs, file, codec, preserve, null);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve,
+ TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this(conf, fs, file, 0, fs.getFileStatus(file).getLen(), codec, preserve,
+ mergedMapOutputsCounter);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength,
+ CompressionCodec codec,
+ boolean preserve) throws IOException {
+ this(conf, fs, file, segmentOffset, segmentLength, codec, preserve, null);
+ }
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ long segmentOffset, long segmentLength, CompressionCodec codec,
+ boolean preserve, TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.file = file;
+ this.codec = codec;
+ this.preserve = preserve;
+
+ this.segmentOffset = segmentOffset;
+ this.segmentLength = segmentLength;
+
+ this.mapOutputsCounter = mergedMapOutputsCounter;
+ }
+
+ public Segment(Reader reader, boolean preserve) {
+ this(reader, preserve, null);
+ }
+
+ public Segment(Reader reader, boolean preserve,
+ TezCounter mapOutputsCounter) {
+ this.reader = reader;
+ this.preserve = preserve;
+
+ this.segmentLength = reader.getLength();
+
+ this.mapOutputsCounter = mapOutputsCounter;
+ }
+
+ void init(TezCounter readsCounter) throws IOException {
+ if (reader == null) {
+ FSDataInputStream in = fs.open(file);
+ in.seek(segmentOffset);
+ reader = new Reader(conf, in, segmentLength, codec, readsCounter);
+ }
+
+ if (mapOutputsCounter != null) {
+ mapOutputsCounter.increment(1);
+ }
+ }
+
+ boolean inMemory() {
+ return fs == null;
+ }
+
+ DataInputBuffer getKey() { return key; }
+
+ DataInputBuffer getValue(DataInputBuffer value) throws IOException {
+ nextRawValue(value);
+ return value;
+ }
+
+ public long getLength() {
+ return (reader == null) ?
+ segmentLength : reader.getLength();
+ }
+
+ boolean nextRawKey() throws IOException {
+ return reader.nextRawKey(key);
+ }
+
+ void nextRawValue(DataInputBuffer value) throws IOException {
+ reader.nextRawValue(value);
+ }
+
+ void closeReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ }
+
+ void close() throws IOException {
+ closeReader();
+ if (!preserve && fs != null) {
+ fs.delete(file, false);
+ }
+ }
+
+ public long getPosition() throws IOException {
+ return reader.getPosition();
+ }
+
+ // This method is used by BackupStore to extract the
+ // absolute position after a reset
+ long getActualPosition() throws IOException {
+ return segmentOffset + reader.getPosition();
+ }
+
+ Reader getReader() {
+ return reader;
+ }
+
+ // This method is used by BackupStore to reinitialize the
+ // reader to start reading from a different segment offset
+ void reinitReader(int offset) throws IOException {
+ if (!inMemory()) {
+ closeReader();
+ segmentOffset = offset;
+ segmentLength = fs.getFileStatus(file).getLen() - segmentOffset;
+ init(null);
+ }
+ }
+ }
+
+ // Boolean variable for including/considering final merge as part of sort
+ // phase or not. This is true in map task, false in reduce task. It is
+ // used in calculating mergeProgress.
+ static boolean includeFinalMerge = false;
+
+ /**
+ * Sets the boolean variable includeFinalMerge to true. Called from
+ * map task before calling merge() so that final merge of map task
+ * is also considered as part of sort phase.
+ */
+ public static void considerFinalMergeForProgress() {
+ includeFinalMerge = true;
+ }
+
+ private static class MergeQueue<K extends Object, V extends Object>
+ extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
+ Configuration conf;
+ FileSystem fs;
+ CompressionCodec codec;
+
+ List<Segment> segments = new ArrayList<Segment>();
+
+ RawComparator comparator;
+
+ private long totalBytesProcessed;
+ private float progPerByte;
+ private Progress mergeProgress = new Progress();
+
+ Progressable reporter;
+
+ DataInputBuffer key;
+ final DataInputBuffer value = new DataInputBuffer();
+ final DataInputBuffer diskIFileValue = new DataInputBuffer();
+
+ Segment minSegment;
+ Comparator<Segment> segmentComparator =
+ new Comparator<Segment>() {
+ public int compare(Segment o1, Segment o2) {
+ if (o1.getLength() == o2.getLength()) {
+ return 0;
+ }
+
+ return o1.getLength() < o2.getLength() ? -1 : 1;
+ }
+ };
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ Path[] inputs, boolean deleteInputs,
+ CompressionCodec codec, RawComparator comparator,
+ Progressable reporter,
+ TezCounter mergedMapOutputsCounter)
+ throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.codec = codec;
+ this.comparator = comparator;
+ this.reporter = reporter;
+
+ for (Path file : inputs) {
+ LOG.debug("MergeQ: adding: " + file);
+ segments.add(new Segment(conf, fs, file, codec, !deleteInputs,
+ (file.toString().endsWith(
+ Constants.MERGED_OUTPUT_PREFIX) ?
+ null : mergedMapOutputsCounter)));
+ }
+
+ // Sort segments on file-lengths
+ Collections.sort(segments, segmentComparator);
+ }
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment> segments, RawComparator comparator,
+ Progressable reporter, boolean sortSegments) {
+ this.conf = conf;
+ this.fs = fs;
+ this.comparator = comparator;
+ this.segments = segments;
+ this.reporter = reporter;
+ if (sortSegments) {
+ Collections.sort(segments, segmentComparator);
+ }
+ }
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment> segments, RawComparator comparator,
+ Progressable reporter, boolean sortSegments, CompressionCodec codec) {
+ this(conf, fs, segments, comparator, reporter, sortSegments);
+ this.codec = codec;
+ }
+
+ public void close() throws IOException {
+ Segment segment;
+ while((segment = pop()) != null) {
+ segment.close();
+ }
+ }
+
+ public DataInputBuffer getKey() throws IOException {
+ return key;
+ }
+
+ public DataInputBuffer getValue() throws IOException {
+ return value;
+ }
+
+ private void adjustPriorityQueue(Segment reader) throws IOException{
+ long startPos = reader.getPosition();
+ boolean hasNext = reader.nextRawKey();
+ long endPos = reader.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ if (hasNext) {
+ adjustTop();
+ } else {
+ pop();
+ reader.close();
+ }
+ }
+
+ public boolean next() throws IOException {
+ if (size() == 0)
+ return false;
+
+ if (minSegment != null) {
+ //minSegment is non-null for all invocations of next except the first
+ //one. For the first invocation, the priority queue is ready for use
+ //but for the subsequent invocations, first adjust the queue
+ adjustPriorityQueue(minSegment);
+ if (size() == 0) {
+ minSegment = null;
+ return false;
+ }
+ }
+ minSegment = top();
+ if (!minSegment.inMemory()) {
+ //When we load the value from an inmemory segment, we reset
+ //the "value" DIB in this class to the inmem segment's byte[].
+ //When we load the value bytes from disk, we shouldn't use
+ //the same byte[] since it would corrupt the data in the inmem
+ //segment. So we maintain an explicit DIB for value bytes
+ //obtained from disk, and if the current segment is a disk
+ //segment, we reset the "value" DIB to the byte[] in that (so
+ //we reuse the disk segment DIB whenever we consider
+ //a disk segment).
+ value.reset(diskIFileValue.getData(), diskIFileValue.getLength());
+ }
+ long startPos = minSegment.getPosition();
+ key = minSegment.getKey();
+ minSegment.getValue(value);
+ long endPos = minSegment.getPosition();
+ totalBytesProcessed += endPos - startPos;
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ return true;
+ }
+
+ protected boolean lessThan(Object a, Object b) {
+ DataInputBuffer key1 = ((Segment)a).getKey();
+ DataInputBuffer key2 = ((Segment)b).getKey();
+ int s1 = key1.getPosition();
+ int l1 = key1.getLength() - s1;
+ int s2 = key2.getPosition();
+ int l2 = key2.getLength() - s2;
+
+ return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+ }
+
+ public TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+ int factor, Path tmpDir,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ return merge(keyClass, valueClass, factor, 0, tmpDir,
+ readsCounter, writesCounter, mergePhase);
+ }
+
+ TezRawKeyValueIterator merge(Class keyClass, Class valueClass,
+ int factor, int inMem, Path tmpDir,
+ TezCounter readsCounter,
+ TezCounter writesCounter,
+ Progress mergePhase)
+ throws IOException {
+ LOG.info("Merging " + segments.size() + " sorted segments");
+
+ /*
+ * If there are inMemory segments, then they come first in the segments
+ * list and then the sorted disk segments. Otherwise(if there are only
+ * disk segments), then they are sorted segments if there are more than
+ * factor segments in the segments list.
+ */
+ int numSegments = segments.size();
+ int origFactor = factor;
+ int passNo = 1;
+ if (mergePhase != null) {
+ mergeProgress = mergePhase;
+ }
+
+ long totalBytes = computeBytesInMerges(factor, inMem);
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
+
+ //create the MergeStreams from the sorted map created in the constructor
+ //and dump the final output to a file
+ do {
+ //get the factor for this pass of merge. We assume in-memory segments
+ //are the first entries in the segment list and that the pass factor
+ //doesn't apply to them
+ factor = getPassFactor(factor, passNo, numSegments - inMem);
+ if (1 == passNo) {
+ factor += inMem;
+ }
+ List<Segment> segmentsToMerge =
+ new ArrayList<Segment>();
+ int segmentsConsidered = 0;
+ int numSegmentsToConsider = factor;
+ long startBytes = 0; // starting bytes of segments of this merge
+ while (true) {
+ //extract the smallest 'factor' number of segments
+ //Call cleanup on the empty segments (no key/value data)
+ List<Segment> mStream =
+ getSegmentDescriptors(numSegmentsToConsider);
+ for (Segment segment : mStream) {
+ // Initialize the segment at the last possible moment;
+ // this helps in ensuring we don't use buffers until we need them
+ segment.init(readsCounter);
+ long startPos = segment.getPosition();
+ boolean hasNext = segment.nextRawKey();
+ long endPos = segment.getPosition();
+
+ if (hasNext) {
+ startBytes += endPos - startPos;
+ segmentsToMerge.add(segment);
+ segmentsConsidered++;
+ }
+ else {
+ segment.close();
+ numSegments--; //we ignore this segment for the merge
+ }
+ }
+ //if we have the desired number of segments
+ //or looked at all available segments, we break
+ if (segmentsConsidered == factor ||
+ segments.size() == 0) {
+ break;
+ }
+
+ numSegmentsToConsider = factor - segmentsConsidered;
+ }
+
+ //feed the streams to the priority queue
+ initialize(segmentsToMerge.size());
+ clear();
+ for (Segment segment : segmentsToMerge) {
+ put(segment);
+ }
+
+ //if we have lesser number of segments remaining, then just return the
+ //iterator, else do another single level merge
+ if (numSegments <= factor) {
+ if (!includeFinalMerge) { // for reduce task
+
+ // Reset totalBytesProcessed and recalculate totalBytes from the
+ // remaining segments to track the progress of the final merge.
+ // Final merge is considered as the progress of the reducePhase,
+ // the 3rd phase of reduce task.
+ totalBytesProcessed = 0;
+ totalBytes = 0;
+ for (int i = 0; i < segmentsToMerge.size(); i++) {
+ totalBytes += segmentsToMerge.get(i).getLength();
+ }
+ }
+ if (totalBytes != 0) //being paranoid
+ progPerByte = 1.0f / (float)totalBytes;
+
+ totalBytesProcessed += startBytes;
+ if (totalBytes != 0)
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+ else
+ mergeProgress.set(1.0f); // Last pass and no segments left - we're done
+
+ LOG.info("Down to the last merge-pass, with " + numSegments +
+ " segments left of total size: " +
+ (totalBytes - totalBytesProcessed) + " bytes");
+ return this;
+ } else {
+ LOG.info("Merging " + segmentsToMerge.size() +
+ " intermediate segments out of a total of " +
+ (segments.size()+segmentsToMerge.size()));
+
+ long bytesProcessedInPrevMerges = totalBytesProcessed;
+ totalBytesProcessed += startBytes;
+
+ //we want to spread the creation of temp files on multiple disks if
+ //available under the space constraints
+ long approxOutputSize = 0;
+ for (Segment s : segmentsToMerge) {
+ approxOutputSize += s.getLength() +
+ ChecksumFileSystem.getApproxChkSumLength(
+ s.getLength());
+ }
+ Path tmpFilename =
+ new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+ Path outputFile = lDirAlloc.getLocalPathForWrite(
+ tmpFilename.toString(),
+ approxOutputSize, conf);
+
+ Writer writer =
+ new Writer(conf, fs, outputFile, keyClass, valueClass, codec,
+ writesCounter);
+ writeFile(this, writer, reporter, conf);
+ writer.close();
+
+ //we finished one single level merge; now clean up the priority
+ //queue
+ this.close();
+
+ // Add the newly create segment to the list of segments to be merged
+ Segment tempSegment =
+ new Segment(conf, fs, outputFile, codec, false);
+
+ // Insert new merged segment into the sorted list
+ int pos = Collections.binarySearch(segments, tempSegment,
+ segmentComparator);
+ if (pos < 0) {
+ // binary search failed. So position to be inserted at is -pos-1
+ pos = -pos-1;
+ }
+ segments.add(pos, tempSegment);
+ numSegments = segments.size();
+
+ // Subtract the difference between expected size of new segment and
+ // actual size of new segment(Expected size of new segment is
+ // inputBytesOfThisMerge) from totalBytes. Expected size and actual
+ // size will match(almost) if combiner is not called in merge.
+ long inputBytesOfThisMerge = totalBytesProcessed -
+ bytesProcessedInPrevMerges;
+ totalBytes -= inputBytesOfThisMerge - tempSegment.getLength();
+ if (totalBytes != 0) {
+ progPerByte = 1.0f / (float)totalBytes;
+ }
+
+ passNo++;
+ }
+ //we are worried about only the first pass merge factor. So reset the
+ //factor to what it originally was
+ factor = origFactor;
+ } while(true);
+ }
+
+ /**
+ * Determine the number of segments to merge in a given pass. Assuming more
+ * than factor segments, the first pass should attempt to bring the total
+ * number of segments - 1 to be divisible by the factor - 1 (each pass
+ * takes X segments and produces 1) to minimize the number of merges.
+ */
+ private int getPassFactor(int factor, int passNo, int numSegments) {
+ if (passNo > 1 || numSegments <= factor || factor == 1)
+ return factor;
+ int mod = (numSegments - 1) % (factor - 1);
+ if (mod == 0)
+ return factor;
+ return mod + 1;
+ }
+
+ /** Return (& remove) the requested number of segment descriptors from the
+ * sorted map.
+ */
+ private List<Segment> getSegmentDescriptors(int numDescriptors) {
+ if (numDescriptors > segments.size()) {
+ List<Segment> subList = new ArrayList<Segment>(segments);
+ segments.clear();
+ return subList;
+ }
+
+ List<Segment> subList =
+ new ArrayList<Segment>(segments.subList(0, numDescriptors));
+ for (int i=0; i < numDescriptors; ++i) {
+ segments.remove(0);
+ }
+ return subList;
+ }
+
+ /**
+ * Compute expected size of input bytes to merges, will be used in
+ * calculating mergeProgress. This simulates the above merge() method and
+ * tries to obtain the number of bytes that are going to be merged in all
+ * merges(assuming that there is no combiner called while merging).
+ * @param factor mapreduce.task.io.sort.factor
+ * @param inMem number of segments in memory to be merged
+ */
+ long computeBytesInMerges(int factor, int inMem) {
+ int numSegments = segments.size();
+ List<Long> segmentSizes = new ArrayList<Long>(numSegments);
+ long totalBytes = 0;
+ int n = numSegments - inMem;
+ // factor for 1st pass
+ int f = getPassFactor(factor, 1, n) + inMem;
+ n = numSegments;
+
+ for (int i = 0; i < numSegments; i++) {
+ // Not handling empty segments here assuming that it would not affect
+ // much in calculation of mergeProgress.
+ segmentSizes.add(segments.get(i).getLength());
+ }
+
+ // If includeFinalMerge is true, allow the following while loop iterate
+ // for 1 more iteration. This is to include final merge as part of the
+ // computation of expected input bytes of merges
+ boolean considerFinalMerge = includeFinalMerge;
+
+ while (n > f || considerFinalMerge) {
+ if (n <=f ) {
+ considerFinalMerge = false;
+ }
+ long mergedSize = 0;
+ f = Math.min(f, segmentSizes.size());
+ for (int j = 0; j < f; j++) {
+ mergedSize += segmentSizes.remove(0);
+ }
+ totalBytes += mergedSize;
+
+ // insert new size into the sorted list
+ int pos = Collections.binarySearch(segmentSizes, mergedSize);
+ if (pos < 0) {
+ pos = -pos-1;
+ }
+ segmentSizes.add(pos, mergedSize);
+
+ n -= (f-1);
+ f = factor;
+ }
+
+ return totalBytes;
+ }
+
+ public Progress getProgress() {
+ return mergeProgress;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
new file mode 100644
index 0000000..3a2c2bf
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezRawKeyValueIterator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>TezRawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TezRawKeyValueIterator {
+ /**
+ * Gets the current raw key.
+ *
+ * @return Gets the current raw key as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getKey() throws IOException;
+
+ /**
+ * Gets the current raw value.
+ *
+ * @return Gets the current raw value as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getValue() throws IOException;
+
+ /**
+ * Sets up the current key and value (for getKey and getValue).
+ *
+ * @return <code>true</code> if there exists a key/value,
+ * <code>false</code> otherwise.
+ * @throws IOException
+ */
+ boolean next() throws IOException;
+
+ /**
+ * Closes the iterator so that the underlying streams can be closed.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /** Gets the Progress object; this has a float (0.0 - 1.0)
+ * indicating the bytes processed by the iterator so far
+ */
+ Progress getProgress();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
new file mode 100644
index 0000000..ab4142b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezSpillRecord.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.runtime.library.common.sort.impl;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.tez.runtime.library.common.Constants;
+
+public class TezSpillRecord {
+
+ /** Backing store */
+ private final ByteBuffer buf;
+ /** View of backing storage as longs */
+ private final LongBuffer entries;
+
+ public TezSpillRecord(int numPartitions) {
+ buf = ByteBuffer.allocate(
+ numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ entries = buf.asLongBuffer();
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job) throws IOException {
+ this(indexFileName, job, null);
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job, String expectedIndexOwner)
+ throws IOException {
+ this(indexFileName, job, new PureJavaCrc32(), expectedIndexOwner);
+ }
+
+ public TezSpillRecord(Path indexFileName, Configuration job, Checksum crc,
+ String expectedIndexOwner)
+ throws IOException {
+
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ final FSDataInputStream in = rfs.open(indexFileName);
+ try {
+ final long length = rfs.getFileStatus(indexFileName).getLen();
+ final int partitions =
+ (int) length / Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ final int size = partitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+
+ buf = ByteBuffer.allocate(size);
+ if (crc != null) {
+ crc.reset();
+ CheckedInputStream chk = new CheckedInputStream(in, crc);
+ IOUtils.readFully(chk, buf.array(), 0, size);
+ if (chk.getChecksum().getValue() != in.readLong()) {
+ throw new ChecksumException("Checksum error reading spill index: " +
+ indexFileName, -1);
+ }
+ } else {
+ IOUtils.readFully(in, buf.array(), 0, size);
+ }
+ entries = buf.asLongBuffer();
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Return number of IndexRecord entries in this spill.
+ */
+ public int size() {
+ return entries.capacity() / (Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8);
+ }
+
+ /**
+ * Get spill offsets for given partition.
+ */
+ public TezIndexRecord getIndex(int partition) {
+ final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ return new TezIndexRecord(entries.get(pos), entries.get(pos + 1),
+ entries.get(pos + 2));
+ }
+
+ /**
+ * Set spill offsets for given partition.
+ */
+ public void putIndex(TezIndexRecord rec, int partition) {
+ final int pos = partition * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8;
+ entries.put(pos, rec.getStartOffset());
+ entries.put(pos + 1, rec.getRawLength());
+ entries.put(pos + 2, rec.getPartLength());
+ }
+
+ /**
+ * Write this spill record to the location provided.
+ */
+ public void writeToFile(Path loc, Configuration job)
+ throws IOException {
+ writeToFile(loc, job, new PureJavaCrc32());
+ }
+
+ public void writeToFile(Path loc, Configuration job, Checksum crc)
+ throws IOException {
+ final FileSystem rfs = FileSystem.getLocal(job).getRaw();
+ CheckedOutputStream chk = null;
+ final FSDataOutputStream out = rfs.create(loc);
+ try {
+ if (crc != null) {
+ crc.reset();
+ chk = new CheckedOutputStream(out, crc);
+ chk.write(buf.array());
+ out.writeLong(chk.getChecksum().getValue());
+ } else {
+ out.write(buf.array());
+ }
+ } finally {
+ if (chk != null) {
+ chk.close();
+ } else {
+ out.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
new file mode 100644
index 0000000..1ff486f
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -0,0 +1,1108 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.util.IndexedSortable;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
+import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment;
+
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+
+ private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
+
+ // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
+
+ /**
+ * The size of each record in the index file for the map-outputs.
+ */
+ public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
+
+ private final static int APPROX_HEADER_LENGTH = 150;
+
+ // k/v accounting
+ IntBuffer kvmeta; // metadata overlay on backing store
+ int kvstart; // marks origin of spill metadata
+ int kvend; // marks end of spill metadata
+ int kvindex; // marks end of fully serialized records
+
+ int equator; // marks origin of meta/serialization
+ int bufstart; // marks beginning of spill
+ int bufend; // marks beginning of collectable
+ int bufmark; // marks end of record
+ int bufindex; // marks end of collected
+ int bufvoid; // marks the point where we should stop
+ // reading at the end of the buffer
+
+ byte[] kvbuffer; // main output buffer
+ private final byte[] b0 = new byte[0];
+
+ protected static final int INDEX = 0; // index offset in acct
+ protected static final int VALSTART = 1; // val offset in acct
+ protected static final int KEYSTART = 2; // key offset in acct
+ protected static final int PARTITION = 3; // partition offset in acct
+ protected static final int NMETA = 4; // num meta ints
+ protected static final int METASIZE = NMETA * 4; // size in bytes
+
+ // spill accounting
+ int maxRec;
+ int softLimit;
+ boolean spillInProgress;
+ int bufferRemaining;
+ volatile Throwable sortSpillException = null;
+
+ int numSpills = 0;
+ int minSpillsForCombine;
+ final ReentrantLock spillLock = new ReentrantLock();
+ final Condition spillDone = spillLock.newCondition();
+ final Condition spillReady = spillLock.newCondition();
+ final BlockingBuffer bb = new BlockingBuffer();
+ volatile boolean spillThreadRunning = false;
+ final SpillThread spillThread = new SpillThread();
+
+ final ArrayList<TezSpillRecord> indexCacheList =
+ new ArrayList<TezSpillRecord>();
+ private int totalIndexCacheMemory;
+ private int indexCacheMemoryLimit;
+
+ @Override
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
+
+ // sanity checks
+ final float spillper = this.conf.getFloat(
+ TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SORT_SPILL_PERCENT);
+ final int sortmb = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_MB);
+ if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
+ throw new IOException("Invalid \""
+ + TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT + "\": " + spillper);
+ }
+ if ((sortmb & 0x7FF) != sortmb) {
+ throw new IOException("Invalid \"" + TezJobConfig.TEZ_RUNTIME_IO_SORT_MB
+ + "\": " + sortmb);
+ }
+
+ indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+
+ // buffers and accounting
+ int maxMemUsage = sortmb << 20;
+ maxMemUsage -= maxMemUsage % METASIZE;
+ kvbuffer = new byte[maxMemUsage];
+ bufvoid = kvbuffer.length;
+ kvmeta = ByteBuffer.wrap(kvbuffer)
+ .order(ByteOrder.nativeOrder())
+ .asIntBuffer();
+ setEquator(0);
+ bufstart = bufend = bufindex = equator;
+ kvstart = kvend = kvindex;
+
+ maxRec = kvmeta.capacity() / NMETA;
+ softLimit = (int)(kvbuffer.length * spillper);
+ bufferRemaining = softLimit;
+ if (LOG.isInfoEnabled()) {
+ LOG.info(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB + ": " + sortmb);
+ LOG.info("soft limit at " + softLimit);
+ LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
+ }
+
+ // k/v serialization
+ valSerializer.open(bb);
+ keySerializer.open(bb);
+
+ spillInProgress = false;
+ minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS, 3);
+ spillThread.setDaemon(true);
+ spillThread.setName("SpillThread");
+ spillLock.lock();
+ try {
+ spillThread.start();
+ while (!spillThreadRunning) {
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Spill thread failed to initialize", e);
+ } finally {
+ spillLock.unlock();
+ }
+ if (sortSpillException != null) {
+ throw new IOException("Spill thread failed to initialize",
+ sortSpillException);
+ }
+ }
+
+ @Override
+ public void write(Object key, Object value)
+ throws IOException {
+ collect(
+ key, value, partitioner.getPartition(key, value, partitions));
+ }
+
+ /**
+ * Serialize the key, value to intermediate storage.
+ * When this method returns, kvindex must refer to sufficient unused
+ * storage to store one METADATA.
+ */
+ synchronized void collect(Object key, Object value, final int partition
+ ) throws IOException {
+
+ if (key.getClass() != keyClass) {
+ throw new IOException("Type mismatch in key from map: expected "
+ + keyClass.getName() + ", received "
+ + key.getClass().getName());
+ }
+ if (value.getClass() != valClass) {
+ throw new IOException("Type mismatch in value from map: expected "
+ + valClass.getName() + ", received "
+ + value.getClass().getName());
+ }
+ if (partition < 0 || partition >= partitions) {
+ throw new IOException("Illegal partition for " + key + " (" +
+ partition + ")" + ", TotalPartitions: " + partitions);
+ }
+ checkSpillException();
+ bufferRemaining -= METASIZE;
+ if (bufferRemaining <= 0) {
+ // start spill if the thread is not running and the soft limit has been
+ // reached
+ spillLock.lock();
+ try {
+ do {
+ if (!spillInProgress) {
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ // serialized, unspilled bytes always lie between kvindex and
+ // bufindex, crossing the equator. Note that any void space
+ // created by a reset must be included in "used" bytes
+ final int bUsed = distanceTo(kvbidx, bufindex);
+ final boolean bufsoftlimit = bUsed >= softLimit;
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished, reclaim space
+ resetSpill();
+ bufferRemaining = Math.min(
+ distanceTo(bufindex, kvbidx) - 2 * METASIZE,
+ softLimit - bUsed) - METASIZE;
+ continue;
+ } else if (bufsoftlimit && kvindex != kvend) {
+ // spill records, if any collected; check latter, as it may
+ // be possible for metadata alignment to hit spill pcnt
+ startSpill();
+ final int avgRec = (int)
+ (mapOutputByteCounter.getValue() /
+ mapOutputRecordCounter.getValue());
+ // leave at least half the split buffer for serialization data
+ // ensure that kvindex >= bufindex
+ final int distkvi = distanceTo(bufindex, kvbidx);
+ final int newPos = (bufindex +
+ Math.max(2 * METASIZE - 1,
+ Math.min(distkvi / 2,
+ distkvi / (METASIZE + avgRec) * METASIZE)))
+ % kvbuffer.length;
+ setEquator(newPos);
+ bufmark = bufindex = newPos;
+ final int serBound = 4 * kvend;
+ // bytes remaining before the lock must be held and limits
+ // checked is the minimum of three arcs: the metadata space, the
+ // serialization space, and the soft limit
+ bufferRemaining = Math.min(
+ // metadata max
+ distanceTo(bufend, newPos),
+ Math.min(
+ // serialization max
+ distanceTo(newPos, serBound),
+ // soft limit
+ softLimit)) - 2 * METASIZE;
+ }
+ }
+ } while (false);
+ } finally {
+ spillLock.unlock();
+ }
+ }
+
+ try {
+ // serialize key bytes into buffer
+ int keystart = bufindex;
+ keySerializer.serialize(key);
+ if (bufindex < keystart) {
+ // wrapped the key; must make contiguous
+ bb.shiftBufferedKey();
+ keystart = 0;
+ }
+ // serialize value bytes into buffer
+ final int valstart = bufindex;
+ valSerializer.serialize(value);
+ // It's possible for records to have zero length, i.e. the serializer
+ // will perform no writes. To ensure that the boundary conditions are
+ // checked and that the kvindex invariant is maintained, perform a
+ // zero-length write into the buffer. The logic monitoring this could be
+ // moved into collect, but this is cleaner and inexpensive. For now, it
+ // is acceptable.
+ bb.write(b0, 0, 0);
+
+ // the record must be marked after the preceding write, as the metadata
+ // for this record are not yet written
+ int valend = bb.markRecord();
+
+ mapOutputRecordCounter.increment(1);
+ mapOutputByteCounter.increment(
+ distanceTo(keystart, valend, bufvoid));
+
+ // write accounting info
+ kvmeta.put(kvindex + INDEX, kvindex);
+ kvmeta.put(kvindex + PARTITION, partition);
+ kvmeta.put(kvindex + KEYSTART, keystart);
+ kvmeta.put(kvindex + VALSTART, valstart);
+ // advance kvindex
+ kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
+ } catch (MapBufferTooSmallException e) {
+ LOG.info("Record too large for in-memory buffer: " + e.getMessage());
+ spillSingleRecord(key, value, partition);
+ mapOutputRecordCounter.increment(1);
+ return;
+ }
+ }
+
+ /**
+ * Set the point from which meta and serialization data expand. The meta
+ * indices are aligned with the buffer, so metadata never spans the ends of
+ * the circular buffer.
+ */
+ private void setEquator(int pos) {
+ equator = pos;
+ // set index prior to first entry, aligned at meta boundary
+ final int aligned = pos - (pos % METASIZE);
+ kvindex =
+ ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
+ "(" + (kvindex * 4) + ")");
+ }
+ }
+
+ /**
+ * The spill is complete, so set the buffer and meta indices to be equal to
+ * the new equator to free space for continuing collection. Note that when
+ * kvindex == kvend == kvstart, the buffer is empty.
+ */
+ private void resetSpill() {
+ final int e = equator;
+ bufstart = bufend = e;
+ final int aligned = e - (e % METASIZE);
+ // set start/end to point to first meta record
+ kvstart = kvend =
+ ((aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
+ (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
+ }
+ }
+
+ /**
+ * Compute the distance in bytes between two indices in the serialization
+ * buffer.
+ * @see #distanceTo(int,int,int)
+ */
+ final int distanceTo(final int i, final int j) {
+ return distanceTo(i, j, kvbuffer.length);
+ }
+
+ /**
+ * Compute the distance between two indices in the circular buffer given the
+ * max distance.
+ */
+ int distanceTo(final int i, final int j, final int mod) {
+ return i <= j
+ ? j - i
+ : mod - i + j;
+ }
+
+ /**
+ * For the given meta position, return the dereferenced position in the
+ * integer array. Each meta block contains several integers describing
+ * record data in its serialized form, but the INDEX is not necessarily
+ * related to the proximate metadata. The index value at the referenced int
+ * position is the start offset of the associated metadata block. So the
+ * metadata INDEX at metapos may point to the metadata described by the
+ * metadata block at metapos + k, which contains information about that
+ * serialized record.
+ */
+ int offsetFor(int metapos) {
+ return kvmeta.get((metapos % maxRec) * NMETA + INDEX);
+ }
+
+ /**
+ * Compare logical range, st i, j MOD offset capacity.
+ * Compare by partition, then by key.
+ * @see IndexedSortable#compare
+ */
+ public int compare(final int mi, final int mj) {
+ final int kvi = offsetFor(mi);
+ final int kvj = offsetFor(mj);
+ final int kvip = kvmeta.get(kvi + PARTITION);
+ final int kvjp = kvmeta.get(kvj + PARTITION);
+ // sort by partition
+ if (kvip != kvjp) {
+ return kvip - kvjp;
+ }
+ // sort by key
+ return comparator.compare(kvbuffer,
+ kvmeta.get(kvi + KEYSTART),
+ kvmeta.get(kvi + VALSTART) - kvmeta.get(kvi + KEYSTART),
+ kvbuffer,
+ kvmeta.get(kvj + KEYSTART),
+ kvmeta.get(kvj + VALSTART) - kvmeta.get(kvj + KEYSTART));
+ }
+
+ /**
+ * Swap logical indices st i, j MOD offset capacity.
+ * @see IndexedSortable#swap
+ */
+ public void swap(final int mi, final int mj) {
+ final int kvi = (mi % maxRec) * NMETA + INDEX;
+ final int kvj = (mj % maxRec) * NMETA + INDEX;
+ int tmp = kvmeta.get(kvi);
+ kvmeta.put(kvi, kvmeta.get(kvj));
+ kvmeta.put(kvj, tmp);
+ }
+
+ /**
+ * Inner class managing the spill of serialized records to disk.
+ */
+ protected class BlockingBuffer extends DataOutputStream {
+
+ public BlockingBuffer() {
+ super(new Buffer());
+ }
+
+ /**
+ * Mark end of record. Note that this is required if the buffer is to
+ * cut the spill in the proper place.
+ */
+ public int markRecord() {
+ bufmark = bufindex;
+ return bufindex;
+ }
+
+ /**
+ * Set position from last mark to end of writable buffer, then rewrite
+ * the data between last mark and kvindex.
+ * This handles a special case where the key wraps around the buffer.
+ * If the key is to be passed to a RawComparator, then it must be
+ * contiguous in the buffer. This recopies the data in the buffer back
+ * into itself, but starting at the beginning of the buffer. Note that
+ * this method should <b>only</b> be called immediately after detecting
+ * this condition. To call it at any other time is undefined and would
+ * likely result in data loss or corruption.
+ * @see #markRecord()
+ */
+ protected void shiftBufferedKey() throws IOException {
+ // spillLock unnecessary; both kvend and kvindex are current
+ int headbytelen = bufvoid - bufmark;
+ bufvoid = bufmark;
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ final int avail =
+ Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
+ if (bufindex + headbytelen < avail) {
+ System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
+ System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
+ bufindex += headbytelen;
+ bufferRemaining -= kvbuffer.length - bufvoid;
+ } else {
+ byte[] keytmp = new byte[bufindex];
+ System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
+ bufindex = 0;
+ out.write(kvbuffer, bufmark, headbytelen);
+ out.write(keytmp);
+ }
+ }
+ }
+
+ public class Buffer extends OutputStream {
+ private final byte[] scratch = new byte[1];
+
+ @Override
+ public void write(int v)
+ throws IOException {
+ scratch[0] = (byte)v;
+ write(scratch, 0, 1);
+ }
+
+ /**
+ * Attempt to write a sequence of bytes to the collection buffer.
+ * This method will block if the spill thread is running and it
+ * cannot write.
+ * @throws MapBufferTooSmallException if record is too large to
+ * deserialize into the collection buffer.
+ */
+ @Override
+ public void write(byte b[], int off, int len)
+ throws IOException {
+ // must always verify the invariant that at least METASIZE bytes are
+ // available beyond kvindex, even when len == 0
+ bufferRemaining -= len;
+ if (bufferRemaining <= 0) {
+ // writing these bytes could exhaust available buffer space or fill
+ // the buffer to soft limit. check if spill or blocking are necessary
+ boolean blockwrite = false;
+ spillLock.lock();
+ try {
+ do {
+ checkSpillException();
+
+ final int kvbidx = 4 * kvindex;
+ final int kvbend = 4 * kvend;
+ // ser distance to key index
+ final int distkvi = distanceTo(bufindex, kvbidx);
+ // ser distance to spill end index
+ final int distkve = distanceTo(bufindex, kvbend);
+
+ // if kvindex is closer than kvend, then a spill is neither in
+ // progress nor complete and reset since the lock was held. The
+ // write should block only if there is insufficient space to
+ // complete the current write, write the metadata for this record,
+ // and write the metadata for the next record. If kvend is closer,
+ // then the write should block if there is too little space for
+ // either the metadata or the current write. Note that collect
+ // ensures its metadata requirement with a zero-length write
+ blockwrite = distkvi <= distkve
+ ? distkvi <= len + 2 * METASIZE
+ : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
+
+ if (!spillInProgress) {
+ if (blockwrite) {
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished, reclaim space
+ // need to use meta exclusively; zero-len rec & 100% spill
+ // pcnt would fail
+ resetSpill(); // resetSpill doesn't move bufindex, kvindex
+ bufferRemaining = Math.min(
+ distkvi - 2 * METASIZE,
+ softLimit - distanceTo(kvbidx, bufindex)) - len;
+ continue;
+ }
+ // we have records we can spill; only spill if blocked
+ if (kvindex != kvend) {
+ startSpill();
+ // Blocked on this write, waiting for the spill just
+ // initiated to finish. Instead of repositioning the marker
+ // and copying the partial record, we set the record start
+ // to be the new equator
+ setEquator(bufmark);
+ } else {
+ // We have no buffered records, and this record is too large
+ // to write into kvbuffer. We must spill it directly from
+ // collect
+ final int size = distanceTo(bufstart, bufindex) + len;
+ setEquator(0);
+ bufstart = bufend = bufindex = equator;
+ kvstart = kvend = kvindex;
+ bufvoid = kvbuffer.length;
+ throw new MapBufferTooSmallException(size + " bytes");
+ }
+ }
+ }
+
+ if (blockwrite) {
+ // wait for spill
+ try {
+ while (spillInProgress) {
+ spillDone.await();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Buffer interrupted while waiting for the writer", e);
+ }
+ }
+ } while (blockwrite);
+ } finally {
+ spillLock.unlock();
+ }
+ }
+ // here, we know that we have sufficient space to write
+ if (bufindex + len > bufvoid) {
+ final int gaplen = bufvoid - bufindex;
+ System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
+ len -= gaplen;
+ off += gaplen;
+ bufindex = 0;
+ }
+ System.arraycopy(b, off, kvbuffer, bufindex, len);
+ bufindex += len;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ LOG.info("Starting flush of map output");
+ spillLock.lock();
+ try {
+ while (spillInProgress) {
+ spillDone.await();
+ }
+ checkSpillException();
+
+ final int kvbend = 4 * kvend;
+ if ((kvbend + METASIZE) % kvbuffer.length !=
+ equator - (equator % METASIZE)) {
+ // spill finished
+ resetSpill();
+ }
+ if (kvindex != kvend) {
+ kvend = (kvindex + NMETA) % kvmeta.capacity();
+ bufend = bufmark;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Sorting & Spilling map output");
+ LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+ "); kvend = " + kvend + "(" + (kvend * 4) +
+ "); length = " + (distanceTo(kvend, kvstart,
+ kvmeta.capacity()) + 1) + "/" + maxRec);
+ }
+ sortAndSpill();
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while waiting for the writer", e);
+ } finally {
+ spillLock.unlock();
+ }
+ assert !spillLock.isHeldByCurrentThread();
+ // shut down spill thread and wait for it to exit. Since the preceding
+ // ensures that it is finished with its work (and sortAndSpill did not
+ // throw), we elect to use an interrupt instead of setting a flag.
+ // Spilling simultaneously from this thread while the spill thread
+ // finishes its work might be both a useful way to extend this and also
+ // sufficient motivation for the latter approach.
+ try {
+ spillThread.interrupt();
+ spillThread.join();
+ } catch (InterruptedException e) {
+ throw new IOException("Spill failed", e);
+ }
+ // release sort buffer before the merge
+ //FIXME
+ //kvbuffer = null;
+ mergeParts();
+ Path outputPath = mapOutputFile.getOutputFile();
+ fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
+ }
+
+ @Override
+ public void close() throws IOException { }
+
+ protected class SpillThread extends Thread {
+
+ @Override
+ public void run() {
+ spillLock.lock();
+ spillThreadRunning = true;
+ try {
+ while (true) {
+ spillDone.signal();
+ while (!spillInProgress) {
+ spillReady.await();
+ }
+ try {
+ spillLock.unlock();
+ sortAndSpill();
+ } catch (Throwable t) {
+ LOG.warn("Got an exception in sortAndSpill", t);
+ sortSpillException = t;
+ } finally {
+ spillLock.lock();
+ if (bufend < bufstart) {
+ bufvoid = kvbuffer.length;
+ }
+ kvstart = kvend;
+ bufstart = bufend;
+ spillInProgress = false;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ spillLock.unlock();
+ spillThreadRunning = false;
+ }
+ }
+ }
+
+ private void checkSpillException() throws IOException {
+ final Throwable lspillException = sortSpillException;
+ if (lspillException != null) {
+ if (lspillException instanceof Error) {
+ final String logMsg = "Task " + outputContext.getUniqueIdentifier()
+ + " failed : " + StringUtils.stringifyException(lspillException);
+ outputContext.fatalError(lspillException, logMsg);
+ }
+ throw new IOException("Spill failed", lspillException);
+ }
+ }
+
+ private void startSpill() {
+ assert !spillInProgress;
+ kvend = (kvindex + NMETA) % kvmeta.capacity();
+ bufend = bufmark;
+ spillInProgress = true;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Spilling map output");
+ LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
+ "; bufvoid = " + bufvoid);
+ LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
+ "); kvend = " + kvend + "(" + (kvend * 4) +
+ "); length = " + (distanceTo(kvend, kvstart,
+ kvmeta.capacity()) + 1) + "/" + maxRec);
+ }
+ spillReady.signal();
+ }
+
+ int getMetaStart() {
+ return kvend / NMETA;
+ }
+
+ int getMetaEnd() {
+ return 1 + // kvend is a valid record
+ (kvstart >= kvend
+ ? kvstart
+ : kvmeta.capacity() + kvstart) / NMETA;
+ }
+
+ protected void sortAndSpill()
+ throws IOException, InterruptedException {
+ final int mstart = getMetaStart();
+ final int mend = getMetaEnd();
+ sorter.sort(this, mstart, mend, nullProgressable);
+ spill(mstart, mend);
+ }
+
+ protected void spill(int mstart, int mend)
+ throws IOException, InterruptedException {
+
+ //approximate the length of the output file to be the length of the
+ //buffer + header lengths for the partitions
+ final long size = (bufend >= bufstart
+ ? bufend - bufstart
+ : (bufvoid - bufend) + bufstart) +
+ partitions * APPROX_HEADER_LENGTH;
+ FSDataOutputStream out = null;
+ try {
+ // create spill file
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ out = rfs.create(filename);
+
+ int spindex = mstart;
+ final InMemValBytes value = createInMemValBytes();
+ for (int i = 0; i < partitions; ++i) {
+ IFile.Writer writer = null;
+ try {
+ long segmentStart = out.getPos();
+ writer = new Writer(conf, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ if (combiner == null) {
+ // spill directly
+ DataInputBuffer key = new DataInputBuffer();
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+ final int kvoff = offsetFor(spindex);
+ key.reset(
+ kvbuffer,
+ kvmeta.get(kvoff + KEYSTART),
+ (kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART))
+ );
+ getVBytesForOffset(kvoff, value);
+ writer.append(key, value);
+ ++spindex;
+ }
+ } else {
+ int spstart = spindex;
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex)
+ + PARTITION) == i) {
+ ++spindex;
+ }
+ // Note: we would like to avoid the combiner if we've fewer
+ // than some threshold of records for a partition
+ if (spstart != spindex) {
+ TezRawKeyValueIterator kvIter =
+ new MRResultIterator(spstart, spindex);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running combine processor");
+ }
+ runCombineProcessor(kvIter, writer);
+ }
+ }
+
+ // close the writer
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+
+ writer = null;
+ } finally {
+ if (null != writer) writer.close();
+ }
+ }
+
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+ // create spill index file
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, conf);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+ LOG.info("Finished spill " + numSpills);
+ ++numSpills;
+ } finally {
+ if (out != null) out.close();
+ }
+ }
+
+ /**
+ * Handles the degenerate case where serialization fails to fit in
+ * the in-memory buffer, so we must spill the record from collect
+ * directly to a spill file. Consider this "losing".
+ */
+ private void spillSingleRecord(final Object key, final Object value,
+ int partition) throws IOException {
+ long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
+ FSDataOutputStream out = null;
+ try {
+ // create spill file
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final Path filename =
+ mapOutputFile.getSpillFileForWrite(numSpills, size);
+ out = rfs.create(filename);
+
+ // we don't run the combiner for a single record
+ for (int i = 0; i < partitions; ++i) {
+ IFile.Writer writer = null;
+ try {
+ long segmentStart = out.getPos();
+ // Create a new codec, don't care!
+ writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
+ spilledRecordsCounter);
+
+ if (i == partition) {
+ final long recordStart = out.getPos();
+ writer.append(key, value);
+ // Note that our map byte count will not be accurate with
+ // compression
+ mapOutputByteCounter.increment(out.getPos() - recordStart);
+ }
+ writer.close();
+
+ // record offsets
+ TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, i);
+
+ writer = null;
+ } catch (IOException e) {
+ if (null != writer) writer.close();
+ throw e;
+ }
+ }
+ if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
+ // create spill index file
+ Path indexFilename =
+ mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
+ * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+ spillRec.writeToFile(indexFilename, conf);
+ } else {
+ indexCacheList.add(spillRec);
+ totalIndexCacheMemory +=
+ spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ }
+ ++numSpills;
+ } finally {
+ if (out != null) out.close();
+ }
+ }
+
+ protected int getInMemVBytesLength(int kvoff) {
+ // get the keystart for the next serialized value to be the end
+ // of this value. If this is the last value in the buffer, use bufend
+ final int nextindex = kvoff == kvend
+ ? bufend
+ : kvmeta.get(
+ (kvoff - NMETA + kvmeta.capacity() + KEYSTART) % kvmeta.capacity());
+ // calculate the length of the value
+ int vallen = (nextindex >= kvmeta.get(kvoff + VALSTART))
+ ? nextindex - kvmeta.get(kvoff + VALSTART)
+ : (bufvoid - kvmeta.get(kvoff + VALSTART)) + nextindex;
+ return vallen;
+ }
+
+ /**
+ * Given an offset, populate vbytes with the associated set of
+ * deserialized value bytes. Should only be called during a spill.
+ */
+ int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
+ int vallen = getInMemVBytesLength(kvoff);
+ vbytes.reset(kvbuffer, kvmeta.get(kvoff + VALSTART), vallen);
+ return vallen;
+ }
+
+ /**
+ * Inner class wrapping valuebytes, used for appendRaw.
+ */
+ static class InMemValBytes extends DataInputBuffer {
+ private byte[] buffer;
+ private int start;
+ private int length;
+ private final int bufvoid;
+
+ public InMemValBytes(int bufvoid) {
+ this.bufvoid = bufvoid;
+ }
+
+ public void reset(byte[] buffer, int start, int length) {
+ this.buffer = buffer;
+ this.start = start;
+ this.length = length;
+
+ if (start + length > bufvoid) {
+ this.buffer = new byte[this.length];
+ final int taillen = bufvoid - start;
+ System.arraycopy(buffer, start, this.buffer, 0, taillen);
+ System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
+ this.start = 0;
+ }
+
+ super.reset(this.buffer, this.start, this.length);
+ }
+ }
+
+ InMemValBytes createInMemValBytes() {
+ return new InMemValBytes(bufvoid);
+ }
+
+ protected class MRResultIterator implements TezRawKeyValueIterator {
+ private final DataInputBuffer keybuf = new DataInputBuffer();
+ private final InMemValBytes vbytes = createInMemValBytes();
+ private final int end;
+ private int current;
+ public MRResultIterator(int start, int end) {
+ this.end = end;
+ current = start - 1;
+ }
+ public boolean next() throws IOException {
+ return ++current < end;
+ }
+ public DataInputBuffer getKey() throws IOException {
+ final int kvoff = offsetFor(current);
+ keybuf.reset(kvbuffer, kvmeta.get(kvoff + KEYSTART),
+ kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART));
+ return keybuf;
+ }
+ public DataInputBuffer getValue() throws IOException {
+ getVBytesForOffset(offsetFor(current), vbytes);
+ return vbytes;
+ }
+ public Progress getProgress() {
+ return null;
+ }
+ public void close() { }
+ }
+
+ private void mergeParts() throws IOException {
+ // get the approximate size of the final output/index files
+ long finalOutFileSize = 0;
+ long finalIndexFileSize = 0;
+ final Path[] filename = new Path[numSpills];
+ final String taskIdentifier = outputContext.getUniqueIdentifier();
+
+ for(int i = 0; i < numSpills; i++) {
+ filename[i] = mapOutputFile.getSpillFile(i);
+ finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
+ }
+ if (numSpills == 1) { //the spill is the final output
+ sameVolRename(filename[0],
+ mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+ if (indexCacheList.size() == 0) {
+ sameVolRename(mapOutputFile.getSpillIndexFile(0),
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+ } else {
+ indexCacheList.get(0).writeToFile(
+ mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+ }
+ return;
+ }
+
+ // read in paged indices
+ for (int i = indexCacheList.size(); i < numSpills; ++i) {
+ Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+ indexCacheList.add(new TezSpillRecord(indexFileName, conf));
+ }
+
+ //make correction in the length to include the sequence file header
+ //lengths for each partition
+ finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
+ finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
+ Path finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(finalOutFileSize);
+ Path finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
+
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+
+ if (numSpills == 0) {
+ //create dummy files
+
+ TezSpillRecord sr = new TezSpillRecord(partitions);
+ try {
+ for (int i = 0; i < partitions; i++) {
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(conf, finalOut, keyClass, valClass, codec, null);
+ writer.close();
+
+ TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ sr.putIndex(rec, i);
+ }
+ sr.writeToFile(finalIndexFile, conf);
+ } finally {
+ finalOut.close();
+ }
+ return;
+ }
+ else {
+ TezMerger.considerFinalMergeForProgress();
+
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ for (int parts = 0; parts < partitions; parts++) {
+ //create the segments to be merged
+ List<Segment> segmentList =
+ new ArrayList<Segment>(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+
+ Segment s =
+ new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
+ indexRecord.getPartLength(), codec, true);
+ segmentList.add(i, s);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
+ "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
+ indexRecord.getRawLength() + ", " +
+ indexRecord.getPartLength() + ")");
+ }
+ }
+
+ int mergeFactor =
+ this.conf.getInt(TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_IO_SORT_FACTOR);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
+ //merge
+ TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+ keyClass, valClass, codec,
+ segmentList, mergeFactor,
+ new Path(taskIdentifier),
+ (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+ nullProgressable, sortSegments,
+ null, spilledRecordsCounter,
+ null); // Not using any Progress in TezMerger. Should just work.
+
+ //write merged output to disk
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(conf, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter);
+ if (combiner == null || numSpills < minSpillsForCombine) {
+ TezMerger.writeFile(kvIter, writer,
+ nullProgressable, conf);
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
+ writer.close();
+
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, parts);
+ }
+ spillRec.writeToFile(finalIndexFile, conf);
+ finalOut.close();
+ for(int i = 0; i < numSpills; i++) {
+ rfs.delete(filename[i],true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
new file mode 100644
index 0000000..92ae916
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/InMemoryShuffleSorter.java
@@ -0,0 +1,126 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.runtime.library.common.sort.impl.dflt;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.IntBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleHeader;
+import org.apache.tez.runtime.library.common.shuffle.server.ShuffleHandler;
+import org.apache.tez.runtime.library.common.sort.impl.IFile;
+
+public class InMemoryShuffleSorter extends DefaultSorter {
+
+ private static final Log LOG = LogFactory.getLog(InMemoryShuffleSorter.class);
+
+ static final int IFILE_EOF_LENGTH =
+ 2 * WritableUtils.getVIntSize(IFile.EOF_MARKER);
+ static final int IFILE_CHECKSUM_LENGTH = DataChecksum.Type.CRC32.size;
+
+ private List<Integer> spillIndices = new ArrayList<Integer>();
+ private List<ShuffleHeader> shuffleHeaders = new ArrayList<ShuffleHeader>();
+
+ ShuffleHandler shuffleHandler = new ShuffleHandler(this);
+
+ byte[] kvbuffer;
+ IntBuffer kvmeta;
+
+ @Override
+ public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+ super.initialize(outputContext, conf, numOutputs);
+ shuffleHandler.initialize(outputContext, conf);
+ }
+
+ @Override
+ protected void spill(int mstart, int mend)
+ throws IOException, InterruptedException {
+ // Start the shuffleHandler
+ shuffleHandler.start();
+
+ // Don't spill!
+
+ // Make a copy
+ this.kvbuffer = super.kvbuffer;
+ this.kvmeta = super.kvmeta;
+
+ // Just save spill-indices for serving later
+ int spindex = mstart;
+ for (int i = 0; i < partitions; ++i) {
+ spillIndices.add(spindex);
+
+ int length = 0;
+ while (spindex < mend &&
+ kvmeta.get(offsetFor(spindex) + PARTITION) == i) {
+
+ final int kvoff = offsetFor(spindex);
+ int keyLen =
+ kvmeta.get(kvoff + VALSTART) - kvmeta.get(kvoff + KEYSTART);
+ int valLen = getInMemVBytesLength(kvoff);
+ length +=
+ (keyLen + WritableUtils.getVIntSize(keyLen)) +
+ (valLen + WritableUtils.getVIntSize(valLen));
+
+ ++spindex;
+ }
+ length += IFILE_EOF_LENGTH;
+
+ shuffleHeaders.add(
+ new ShuffleHeader(
+ outputContext.getUniqueIdentifier(), // TODO Verify that this is correct.
+ length + IFILE_CHECKSUM_LENGTH, length, i)
+ );
+ LOG.info("shuffleHeader[" + i + "]:" +
+ " rawLen=" + length + " partLen=" + (length + IFILE_CHECKSUM_LENGTH) +
+ " spillIndex=" + spillIndices.get(i));
+ }
+
+ LOG.info("Saved " + spillIndices.size() + " spill-indices and " +
+ shuffleHeaders.size() + " shuffle headers");
+ }
+
+ @Override
+ public InputStream getSortedStream(int partition) {
+ return new SortBufferInputStream(this, partition);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // FIXME
+ //shuffleHandler.stop();
+ }
+
+ @Override
+ public ShuffleHeader getShuffleHeader(int reduce) {
+ return shuffleHeaders.get(reduce);
+ }
+
+ public int getSpillIndex(int partition) {
+ return spillIndices.get(partition);
+ }
+
+}