You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/06/05 06:06:14 UTC
svn commit: r663440 [2/3] - in /hadoop/core/trunk: ./ conf/
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/io/compress/
src/java/org/apache/hadoop/io/compress/zlib/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/
src/test...
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Merger.java Wed Jun 4 21:06:13 2008
@@ -0,0 +1,416 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumFileSystem;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.mapred.IFile.Reader;
+import org.apache.hadoop.mapred.IFile.Writer;
+import org.apache.hadoop.util.PriorityQueue;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+
+class Merger {
+ private static final Log LOG = LogFactory.getLog(Merger.class);
+
+ private static final long PROGRESS_BAR = 10000;
+
+ public static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ CompressionCodec codec,
+ Path[] inputs, boolean deleteInputs,
+ int mergeFactor, Path tmpDir,
+ RawComparator<K> comparator, Progressable reporter)
+ throws IOException {
+ return
+ new MergeQueue<K, V>(conf, fs, inputs, deleteInputs, codec, comparator,
+ reporter).merge(keyClass, valueClass,
+ mergeFactor, tmpDir);
+ }
+
+ public static <K extends Object, V extends Object>
+ RawKeyValueIterator merge(Configuration conf, FileSystem fs,
+ Class<K> keyClass, Class<V> valueClass,
+ List<Segment<K, V>> segments,
+ int mergeFactor, Path tmpDir,
+ RawComparator<K> comparator, Progressable reporter)
+ throws IOException {
+ return
+ new MergeQueue<K, V>(conf, fs, segments,
+ comparator, reporter).merge(keyClass, valueClass,
+ mergeFactor, tmpDir);
+ }
+
+ public static <K extends Object, V extends Object>
+ void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
+ Progressable progressable)
+ throws IOException {
+ long recordCtr = 0;
+ while(records.next()) {
+ writer.append(records.getKey(), records.getValue());
+
+ if ((++recordCtr % PROGRESS_BAR) == 0) {
+ progressable.progress();
+ }
+ }
+}
+
+ public static class Segment<K extends Object, V extends Object> {
+ Reader<K, V> reader = null;
+ DataInputBuffer key = new DataInputBuffer();
+ DataInputBuffer value = new DataInputBuffer();
+
+ Configuration conf = null;
+ FileSystem fs = null;
+ Path file = null;
+ boolean preserve = false;
+ CompressionCodec codec = null;
+ long segmentLength = -1;
+
+ public Segment(Configuration conf, FileSystem fs, Path file,
+ CompressionCodec codec, boolean preserve) throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.file = file;
+ this.codec = codec;
+ this.preserve = preserve;
+
+ this.segmentLength = fs.getFileStatus(file).getLen();
+ }
+
+ public Segment(Reader<K, V> reader, boolean preserve) {
+ this.reader = reader;
+ this.preserve = preserve;
+
+ this.segmentLength = reader.getLength();
+ }
+
+ private void init() throws IOException {
+ if (reader == null) {
+ reader = new Reader<K, V>(conf, fs, file, codec);
+ }
+ }
+
+ DataInputBuffer getKey() { return key; }
+ DataInputBuffer getValue() { return value; }
+
+ long getLength() { return segmentLength; }
+
+ boolean next() throws IOException {
+ return reader.next(key, value);
+ }
+
+ void close() throws IOException {
+ reader.close();
+
+ if (!preserve && fs != null) {
+ fs.delete(file, false);
+ }
+ }
+ }
+
+ private static class MergeQueue<K extends Object, V extends Object>
+ extends PriorityQueue implements RawKeyValueIterator {
+ Configuration conf;
+ FileSystem fs;
+ CompressionCodec codec;
+
+ List<Segment<K, V>> segments = new ArrayList<Segment<K,V>>();
+
+ RawComparator<K> comparator;
+
+ private long totalBytesProcessed;
+ private float progPerByte;
+ private Progress mergeProgress = new Progress();
+
+ Progressable reporter;
+
+ DataInputBuffer key;
+ DataInputBuffer value;
+
+ Segment<K, V> minSegment;
+ Comparator<Segment<K, V>> segmentComparator =
+ new Comparator<Segment<K, V>>() {
+ public int compare(Segment<K, V> o1, Segment<K, V> o2) {
+ if (o1.getLength() == o2.getLength()) {
+ return 0;
+ }
+
+ return o1.getLength() < o2.getLength() ? -1 : 1;
+ }
+ };
+
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ Path[] inputs, boolean deleteInputs,
+ CompressionCodec codec, RawComparator<K> comparator,
+ Progressable reporter)
+ throws IOException {
+ this.conf = conf;
+ this.fs = fs;
+ this.codec = codec;
+ this.comparator = comparator;
+ this.reporter = reporter;
+
+ for (Path file : inputs) {
+ segments.add(new Segment<K, V>(conf, fs, file, codec, !deleteInputs));
+ }
+
+ // Sort segments on file-lengths
+ Collections.sort(segments, segmentComparator);
+ }
+
+
+ public MergeQueue(Configuration conf, FileSystem fs,
+ List<Segment<K, V>> segments, RawComparator<K> comparator,
+ Progressable reporter) {
+ this.conf = conf;
+ this.fs = fs;
+ this.comparator = comparator;
+ this.segments = segments;
+ this.reporter = reporter;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void close() throws IOException {
+ Segment<K, V> segment;
+ while((segment = (Segment<K, V>)pop()) != null) {
+ segment.close();
+ }
+ }
+
+ public DataInputBuffer getKey() throws IOException {
+ return key;
+ }
+
+ public DataInputBuffer getValue() throws IOException {
+ return value;
+ }
+
+ private void adjustPriorityQueue(Segment<K, V> reader) throws IOException{
+ if (reader.next()) {
+ adjustTop();
+ } else {
+ pop();
+ reader.close();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean next() throws IOException {
+ if (size() == 0)
+ return false;
+
+ if (minSegment != null) {
+ //minSegment is non-null for all invocations of next except the first
+ //one. For the first invocation, the priority queue is ready for use
+ //but for the subsequent invocations, first adjust the queue
+ adjustPriorityQueue(minSegment);
+ if (size() == 0) {
+ minSegment = null;
+ return false;
+ }
+ }
+ minSegment = (Segment<K, V>)top();
+
+ key = minSegment.getKey();
+ value = minSegment.getValue();
+
+ totalBytesProcessed += (key.getLength()-key.getPosition()) +
+ (value.getLength()-value.getPosition());
+ mergeProgress.set(totalBytesProcessed * progPerByte);
+
+ return true;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected boolean lessThan(Object a, Object b) {
+ DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
+ DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
+ int s1 = key1.getPosition();
+ int l1 = key1.getLength() - s1;
+ int s2 = key2.getPosition();
+ int l2 = key2.getLength() - s2;
+
+ return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
+ }
+
+ public RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
+ int factor, Path tmpDir)
+ throws IOException {
+ LOG.info("Merging " + segments.size() + " sorted segments");
+
+ //create the MergeStreams from the sorted map created in the constructor
+ //and dump the final output to a file
+ int numSegments = segments.size();
+ int origFactor = factor;
+ int passNo = 1;
+ LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
+ do {
+ //get the factor for this pass of merge
+ factor = getPassFactor(factor, passNo, numSegments);
+ List<Segment<K, V>> segmentsToMerge =
+ new ArrayList<Segment<K, V>>();
+ int segmentsConsidered = 0;
+ int numSegmentsToConsider = factor;
+ while (true) {
+ //extract the smallest 'factor' number of segments
+ //Call cleanup on the empty segments (no key/value data)
+ List<Segment<K, V>> mStream =
+ getSegmentDescriptors(numSegmentsToConsider);
+ for (Segment<K, V> segment : mStream) {
+ // Initialize the segment at the last possible moment;
+ // this helps in ensuring we don't use buffers until we need them
+ segment.init();
+
+ if (segment.next()) {
+ segmentsToMerge.add(segment);
+ segmentsConsidered++;
+ }
+ else {
+ segment.close();
+ numSegments--; //we ignore this segment for the merge
+ }
+ }
+ //if we have the desired number of segments
+ //or looked at all available segments, we break
+ if (segmentsConsidered == factor ||
+ segments.size() == 0) {
+ break;
+ }
+
+ numSegmentsToConsider = factor - segmentsConsidered;
+ }
+
+ //feed the streams to the priority queue
+ initialize(segmentsToMerge.size()); clear();
+ for (Segment<K, V> segment : segmentsToMerge) {
+ put(segment);
+ }
+
+ //if we have lesser number of segments remaining, then just return the
+ //iterator, else do another single level merge
+ if (numSegments <= factor) {
+ //calculate the length of the remaining segments. Required for
+ //calculating the merge progress
+ long totalBytes = 0;
+ for (int i = 0; i < segmentsToMerge.size(); i++) {
+ totalBytes += segmentsToMerge.get(i).getLength();
+ }
+ if (totalBytes != 0) //being paranoid
+ progPerByte = 1.0f / (float)totalBytes;
+
+ // Reset bytes-processed to track the progress of the final merge
+ totalBytesProcessed = 0;
+
+ LOG.info("Down to the last merge-pass, with " + numSegments +
+ " segments left of total size: " + totalBytes + " bytes");
+ return this;
+ } else {
+ LOG.info("Merging " + segmentsToMerge.size() +
+ " intermediate segments out of a total of " +
+ (segments.size()+segmentsToMerge.size()));
+
+ //we want to spread the creation of temp files on multiple disks if
+ //available under the space constraints
+ long approxOutputSize = 0;
+ for (Segment<K, V> s : segmentsToMerge) {
+ approxOutputSize += s.getLength() +
+ ChecksumFileSystem.getApproxChkSumLength(
+ s.getLength());
+ }
+ Path tmpFilename =
+ new Path(tmpDir, "intermediate").suffix("." + passNo);
+
+ Path outputFile = lDirAlloc.getLocalPathForWrite(
+ tmpFilename.toString(),
+ approxOutputSize, conf);
+
+ Writer<K, V> writer =
+ new Writer<K, V>(conf, fs, outputFile, keyClass, valueClass, codec);
+ writeFile(this, writer, reporter);
+ writer.close();
+
+ //we finished one single level merge; now clean up the priority
+ //queue
+ this.close();
+
+ // Add the newly create segment to the list of segments to be merged
+ Segment<K, V> tempSegment =
+ new Segment<K, V>(conf, fs, outputFile, codec, false);
+ segments.add(tempSegment);
+ numSegments = segments.size();
+ Collections.sort(segments, segmentComparator);
+
+ passNo++;
+ }
+ //we are worried about only the first pass merge factor. So reset the
+ //factor to what it originally was
+ factor = origFactor;
+ } while(true);
+ }
+
+ //HADOOP-591
+ private int getPassFactor(int factor, int passNo, int numSegments) {
+ if (passNo > 1 || numSegments <= factor || factor == 1)
+ return factor;
+ int mod = (numSegments - 1) % (factor - 1);
+ if (mod == 0)
+ return factor;
+ return mod + 1;
+ }
+
+ /** Return (& remove) the requested number of segment descriptors from the
+ * sorted map.
+ */
+ private List<Segment<K, V>> getSegmentDescriptors(int numDescriptors) {
+ if (numDescriptors > segments.size()) {
+ List<Segment<K, V>> subList = new ArrayList<Segment<K,V>>(segments);
+ segments.clear();
+ return subList;
+ }
+
+ List<Segment<K, V>> subList =
+ new ArrayList<Segment<K,V>>(segments.subList(0, numDescriptors));
+ for (int i=0; i < numDescriptors; ++i) {
+ segments.remove(0);
+ }
+ return subList;
+ }
+
+ public Progress getProgress() {
+ return mergeProgress;
+ }
+
+ }
+}
\ No newline at end of file
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RamManager.java Wed Jun 4 21:06:13 2008
@@ -0,0 +1,48 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+
+class RamManager {
+ volatile private int numReserved = 0;
+ volatile private int size = 0;
+ private final int maxSize;
+
+ public RamManager(Configuration conf) {
+ maxSize = conf.getInt("fs.inmemory.size.mb", 100) * 1024 * 1024;
+ }
+
+ synchronized boolean reserve(long requestedSize) {
+ if (requestedSize > Integer.MAX_VALUE ||
+ (size + requestedSize) > Integer.MAX_VALUE) {
+ return false;
+ }
+
+ if ((size + requestedSize) < maxSize) {
+ size += requestedSize;
+ ++numReserved;
+ return true;
+ }
+ return false;
+ }
+
+ synchronized void unreserve(int requestedSize) {
+ size -= requestedSize;
+ --numReserved;
+ }
+
+ int getUsedMemory() {
+ return size;
+ }
+
+ float getPercentUsed() {
+ return (float)size/maxSize;
+ }
+
+ int getReservedFiles() {
+ return numReserved;
+ }
+
+ int getMemoryLimit() {
+ return maxSize;
+ }
+}
Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java?rev=663440&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RawKeyValueIterator.java Wed Jun 4 21:06:13 2008
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+/**
+ * <code>RawKeyValueIterator</code> is an iterator used to iterate over
+ * the raw keys and values during sort/merge of intermediate data.
+ */
+interface RawKeyValueIterator {
+ /**
+ * Gets the current raw key.
+ *
+ * @return Gets the current raw key as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getKey() throws IOException;
+
+ /**
+ * Gets the current raw value.
+ *
+ * @return Gets the current raw value as a DataInputBuffer
+ * @throws IOException
+ */
+ DataInputBuffer getValue() throws IOException;
+
+ /**
+ * Sets up the current key and value (for getKey and getValue).
+ *
+ * @return <code>true</code> if there exists a key/value,
+ * <code>false</code> otherwise.
+ * @throws IOException
+ */
+ boolean next() throws IOException;
+
+ /**
+ * Closes the iterator so that the underlying streams can be closed.
+ *
+ * @throws IOException
+ */
+ void close() throws IOException;
+
+ /** Gets the Progress object; this has a float (0.0 - 1.0)
+ * indicating the bytes processed by the iterator so far
+ */
+ Progress getProgress();
+}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Jun 4 21:06:13 2008
@@ -22,21 +22,23 @@
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
+import java.net.URLConnection;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Hashtable;
import java.util.LinkedHashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
@@ -48,20 +50,20 @@
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InMemoryFileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.InputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.IFile.*;
+import org.apache.hadoop.mapred.Merger.Segment;
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
@@ -86,6 +88,9 @@
private int numMaps;
private ReduceCopier reduceCopier;
+ private CompressionCodec codec;
+
+
{
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with
@@ -137,6 +142,17 @@
super(jobFile, taskId, partition);
this.numMaps = numMaps;
}
+
+ private CompressionCodec initCodec() {
+ // check if map-outputs are to be compressed
+ if (conf.getCompressMapOutput()) {
+ Class<? extends CompressionCodec> codecClass =
+ conf.getMapOutputCompressorClass(DefaultCodec.class);
+ return (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
+ }
+
+ return null;
+ }
@Override
public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -191,118 +207,9 @@
return fileList.toArray(new Path[0]);
}
- /** Iterates values while keys match in sorted input. */
- static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
- private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
- private KEY key; // current key
- private KEY nextKey;
- private VALUE value; // current value
- private boolean hasNext; // more w/ this key
- private boolean more; // more in file
- private RawComparator<KEY> comparator;
- private DataOutputBuffer nextValue = new DataOutputBuffer();
- private InputBuffer valIn = new InputBuffer();
- private InputBuffer keyIn = new InputBuffer();
- protected Progressable reporter;
- private Deserializer<KEY> keyDeserializer;
- private Deserializer<VALUE> valDeserializer;
-
- @SuppressWarnings("unchecked")
- public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
- RawComparator<KEY> comparator,
- Class<KEY> keyClass,
- Class<VALUE> valClass, Configuration conf,
- Progressable reporter)
- throws IOException {
- this.in = in;
- this.comparator = comparator;
- this.reporter = reporter;
- SerializationFactory serializationFactory = new SerializationFactory(conf);
- this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
- this.keyDeserializer.open(keyIn);
- this.valDeserializer = serializationFactory.getDeserializer(valClass);
- this.valDeserializer.open(valIn);
- readNextKey();
- key = nextKey;
- nextKey = null; // force new instance creation
- hasNext = more;
- }
-
- /// Iterator methods
-
- public boolean hasNext() { return hasNext; }
-
- public VALUE next() {
- if (!hasNext) {
- throw new NoSuchElementException("iterate past last value");
- }
- try {
- readNextValue();
- readNextKey();
- } catch (IOException ie) {
- throw new RuntimeException("problem advancing", ie);
- }
- reporter.progress();
- return value;
- }
-
- public void remove() { throw new RuntimeException("not implemented"); }
-
- /// Auxiliary methods
-
- /** Start processing next unique key. */
- public void nextKey() throws IOException {
- // read until we find a new key
- while (hasNext) {
- readNextKey();
- }
- // move the next key to the current one
- KEY tmpKey = key;
- key = nextKey;
- nextKey = tmpKey;
- hasNext = more;
- }
-
- /** True iff more keys remain. */
- public boolean more() {
- return more;
- }
-
- /** The current key. */
- public Object getKey() {
- return key;
- }
-
- /**
- * read the next key
- */
- private void readNextKey() throws IOException {
- more = in.next();
- if (more) {
- DataOutputBuffer nextKeyBytes = in.getKey();
- keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
- nextKey = keyDeserializer.deserialize(nextKey);
- hasNext = key != null && (comparator.compare(key, nextKey) == 0);
- } else {
- hasNext = false;
- }
- }
-
- /**
- * Read the next value
- * @throws IOException
- */
- private void readNextValue() throws IOException {
- nextValue.reset();
- in.getValue().writeUncompressedBytes(nextValue);
- valIn.reset(nextValue.getData(), nextValue.getLength());
- value = valDeserializer.deserialize(value);
- }
- }
-
private class ReduceValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
- public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
+ public ReduceValuesIterator (RawKeyValueIterator in,
RawComparator<KEY> comparator,
Class<KEY> keyClass,
Class<VALUE> valClass,
@@ -310,15 +217,17 @@
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
- public void informReduceProgress() {
- reducePhase.set(super.in.getProgress().get()); // update progress
- reporter.progress();
- }
+
@Override
public VALUE next() {
reduceInputValueCounter.increment(1);
return super.next();
}
+
+ public void informReduceProgress() {
+ reducePhase.set(super.in.getProgress().get()); // update progress
+ reporter.progress();
+ }
}
@Override
@@ -332,6 +241,10 @@
startCommunicationThread(umbilical);
FileSystem lfs = FileSystem.getLocal(job);
+
+ // Initialize the codec
+ codec = initCodec();
+
boolean isLocal = true;
if (!job.get("mapred.job.tracker", "local").equals("local")) {
reduceCopier = new ReduceCopier(umbilical, job);
@@ -347,21 +260,21 @@
Path[] mapFiles = getMapFiles(lfs, isLocal);
Path tempDir = new Path(getTaskID().toString());
-
- SequenceFile.Sorter.RawKeyValueIterator rIter;
setPhase(TaskStatus.Phase.SORT);
final Reporter reporter = getReporter(umbilical);
// sort the input file
- SequenceFile.Sorter sorter = new SequenceFile.Sorter(lfs,
- job.getOutputKeyComparator(), job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(), job);
- sorter.setProgressable(reporter);
- rIter = sorter.merge(mapFiles, tempDir,
- !conf.getKeepFailedTaskFiles()); // sort
-
+ LOG.info("Initiating final on-disk merge with " + mapFiles.length +
+ " files");
+ RawKeyValueIterator rIter =
+ Merger.merge(job, lfs,
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ codec, mapFiles, !conf.getKeepFailedTaskFiles(),
+ job.getInt("io.sort.factor", 100), tempDir,
+ job.getOutputKeyComparator(), reporter);
+
// free up the data structures
mapOutputFilesOnDisk.clear();
mapFiles = null;
@@ -371,6 +284,7 @@
// make output collector
String finalName = getOutputName(getPartition());
+
FileSystem fs = FileSystem.get(job);
final RecordWriter out =
@@ -421,7 +335,7 @@
done(umbilical);
}
- class ReduceCopier implements MRConstants {
+ class ReduceCopier<K, V> implements MRConstants {
/** Reference to the umbilical object */
private TaskUmbilicalProtocol umbilical;
@@ -479,18 +393,15 @@
/**
* A reference to the in memory file system for writing the map outputs to.
*/
- private InMemoryFileSystem inMemFileSys;
+ //private InMemoryFileSystem inMemFileSys;
+
+ private RamManager ramManager;
/**
* A reference to the local file system for writing the map outputs to.
*/
private FileSystem localFileSys;
-
- /**
- * An instance of the sorter used for doing merge
- */
- private SequenceFile.Sorter sorter;
-
+
/**
* Number of files to merge at a time
*/
@@ -519,7 +430,7 @@
/**
* The threads for fetching the files.
*/
- private MapOutputCopier[] copiers = null;
+ private List<MapOutputCopier> copiers = null;
/**
* The object for metrics reporting.
@@ -540,8 +451,8 @@
/**
* The set of required map outputs
*/
- private Set <Integer> neededOutputs =
- Collections.synchronizedSet(new TreeSet<Integer>());
+ private Set <TaskID> copiedMapOutputs =
+ Collections.synchronizedSet(new TreeSet<TaskID>());
/**
* The set of obsolete map taskids.
@@ -602,7 +513,7 @@
* The maps from which we fail to fetch map-outputs
* even after {@link #maxFetchRetriesPerMap} retries.
*/
- Set<Integer> fetchFailedMaps = new TreeSet<Integer>();
+ Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>();
/**
* A map of taskId -> no. of failed fetches
@@ -620,6 +531,13 @@
*/
private static final int MIN_LOG_TIME = 60000;
+ /**
+ * List of in-memory map-outputs.
+ */
+ private final List<MapOutput> mapOutputsFilesInMemory =
+ Collections.synchronizedList(new LinkedList<MapOutput>());
+
+
/**
* This class contains the methods that should be used for metrics-reporting
* the specific metrics for shuffle. This class actually reports the
@@ -696,7 +614,6 @@
this.size = size;
}
- public int getMapId() { return loc.getMapId(); }
public boolean getSuccess() { return size >= 0; }
public boolean isObsolete() {
return size == OBSOLETE;
@@ -706,28 +623,114 @@
public MapOutputLocation getLocation() { return loc; }
}
- private int extractMapIdFromPathName(Path pathname) {
- //all paths end with map_<id>.out
- String firstPathName = pathname.getName();
- int beginIndex = firstPathName.lastIndexOf("map_");
- int endIndex = firstPathName.lastIndexOf(".out");
- return Integer.parseInt(firstPathName.substring(beginIndex +
- "map_".length(), endIndex));
+ private int nextMapOutputCopierId = 0;
+
+ /**
+ * Abstraction to track a map-output.
+ */
+ private class MapOutputLocation {
+ TaskAttemptID taskAttemptId;
+ TaskID taskId;
+ String ttHost;
+ URL taskOutput;
+
+ public MapOutputLocation(TaskAttemptID taskAttemptId,
+ String ttHost, URL taskOutput) {
+ this.taskAttemptId = taskAttemptId;
+ this.taskId = this.taskAttemptId.getTaskID();
+ this.ttHost = ttHost;
+ this.taskOutput = taskOutput;
+ }
+
+ public TaskAttemptID getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ public TaskID getTaskId() {
+ return taskId;
+ }
+
+ public String getHost() {
+ return ttHost;
+ }
+
+ public URL getOutputLocation() {
+ return taskOutput;
+ }
}
- private int nextMapOutputCopierId = 0;
+ /** Describes the output of a map; could either be on disk or in-memory. */
+ private class MapOutput {
+ final TaskID mapId;
+
+ final Path file;
+ final Configuration conf;
+
+ byte[] data;
+ final boolean inMemory;
+ long size;
+
+ public MapOutput(TaskID mapId, Configuration conf, Path file, long size) {
+ this.mapId = mapId;
+
+ this.conf = conf;
+ this.file = file;
+ this.size = size;
+
+ this.data = null;
+
+ this.inMemory = false;
+ }
+
+ public MapOutput(TaskID mapId, byte[] data) {
+ this.mapId = mapId;
+
+ this.file = null;
+ this.conf = null;
+
+ this.data = data;
+ this.size = data.length;
+
+ this.inMemory = true;
+ }
+
+ public void discard() throws IOException {
+ if (inMemory) {
+ data = null;
+ } else {
+ FileSystem fs = file.getFileSystem(conf);
+ fs.delete(file, true);
+ }
+ }
+ }
/** Copies map outputs as they become available */
private class MapOutputCopier extends Thread {
-
+ // basic/unit connection timeout (in milliseconds)
+ private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
+ // default read timeout (in milliseconds)
+ private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
+
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
private Reporter reporter;
- public MapOutputCopier(Reporter reporter) {
+ // Decompression of map-outputs
+ private CompressionCodec codec = null;
+ private Decompressor decompressor = null;
+
+ public MapOutputCopier(JobConf job, Reporter reporter) {
setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
LOG.debug(getName() + " created");
this.reporter = reporter;
+
+ if (job.getCompressMapOutput()) {
+ Class<? extends CompressionCodec> codecClass =
+ job.getMapOutputCompressorClass(DefaultCodec.class);
+ codec = (CompressionCodec)
+ ReflectionUtils.newInstance(codecClass, job);
+ decompressor = CodecPool.getDecompressor(codec);
+ }
}
/**
@@ -789,7 +792,7 @@
shuffleClientMetrics.successFetch();
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
- loc.getMapTaskID() + " from " + loc.getHost());
+ loc.getTaskAttemptId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
@@ -817,99 +820,269 @@
private long copyOutput(MapOutputLocation loc
) throws IOException, InterruptedException {
// check if we still need to copy the output from this location
- if (!neededOutputs.contains(loc.getMapId()) ||
- obsoleteMapIds.contains(loc.getMapTaskID())) {
+ if (copiedMapOutputs.contains(loc.getTaskId()) ||
+ obsoleteMapIds.contains(loc.getTaskAttemptId())) {
return CopyResult.OBSOLETE;
}
- TaskAttemptID reduceId = reduceTask.getTaskID();
- LOG.info(reduceId + " Copying " + loc.getMapTaskID() +
- " output from " + loc.getHost() + ".");
// a temp filename. If this file gets created in ramfs, we're fine,
// else, we will check the localFS to find a suitable final location
// for this path
+ TaskAttemptID reduceId = reduceTask.getTaskID();
Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
Path.SEPARATOR + getTaskID().getJobID() +
Path.SEPARATOR + reduceId +
Path.SEPARATOR + "output" + "/map_" +
- loc.getMapId() + ".out");
- // a working filename that will be unique to this attempt
- Path tmpFilename = new Path(filename + "-" + id);
- // this copies the map output file
- tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleClientMetrics,
- tmpFilename, lDirAlloc,
- conf, reduceTask.getPartition(),
- STALLED_COPY_TIMEOUT, reporter);
- if (!neededOutputs.contains(loc.getMapId())) {
- if (tmpFilename != null) {
- FileSystem fs = tmpFilename.getFileSystem(conf);
- fs.delete(tmpFilename, true);
- }
- return CopyResult.OBSOLETE;
+ loc.getTaskId().getId() + ".out");
+
+ // Copy the map output to a temp file whose name is unique to this attempt
+ Path tmpMapOutput = new Path(filename+"-"+id);
+
+ // Copy the map output
+ MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
+ if (mapOutput == null) {
+ throw new IOException("Failed to fetch map-output for " +
+ loc.getTaskAttemptId() + " from " +
+ loc.getHost());
}
- if (tmpFilename == null)
- throw new IOException("File " + filename + "-" + id +
- " not created");
- // This file could have been created in the inmemory
- // fs or the localfs. So need to get the filesystem owning the path.
- FileSystem fs = tmpFilename.getFileSystem(conf);
- long bytes = -1;
+
+ // The size of the map-output
+ long bytes = mapOutput.size;
+
// lock the ReduceTask while we do the rename
synchronized (ReduceTask.this) {
- if (!neededOutputs.contains(loc.getMapId())) {
- fs.delete(tmpFilename, true);
+ if (copiedMapOutputs.contains(loc.getTaskId())) {
+ mapOutput.discard();
return CopyResult.OBSOLETE;
}
+
+ // Special case: discard empty map-outputs
+ if (bytes == 0) {
+ try {
+ mapOutput.discard();
+ } catch (IOException ioe) {
+ LOG.info("Couldn't discard output of " + loc.getTaskId());
+ }
+
+ // Note that we successfully copied the map-output
+ copiedMapOutputs.add(loc.getTaskId());
+ return bytes;
+ }
- bytes = fs.getFileStatus(tmpFilename).getLen();
- //resolve the final filename against the directory where the tmpFile
- //got created
- filename = new Path(tmpFilename.getParent(), filename.getName());
- // if we can't rename the file, something is broken (and IOException
- // will be thrown).
- if (!fs.rename(tmpFilename, filename)) {
- fs.delete(tmpFilename, true);
- bytes = -1;
- throw new IOException("failure to rename map output " +
- tmpFilename);
+ // Process map-output
+ if (mapOutput.inMemory) {
+ // Save it in the synchronized list of map-outputs
+ mapOutputsFilesInMemory.add(mapOutput);
+
+ //Create a thread to do merges. Synchronize access/update to
+ //mergeInProgress
+ if (!mergeInProgress &&
+ ((ramManager.getPercentUsed() >= MAX_INMEM_FILESYS_USE &&
+ ramManager.getReservedFiles() >=
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) ||
+ (mergeThreshold > 0 &&
+ ramManager.getReservedFiles() >= mergeThreshold)) &&
+ mergeThrowable == null) {
+ LOG.info(reduceId + " RamManager " +
+ " is " + ramManager.getPercentUsed() + " full with " +
+ mapOutputsFilesInMemory.size() + " files." +
+ " Triggering merge");
+
+ InMemFSMergeThread m =
+ new InMemFSMergeThread((LocalFileSystem)localFileSys);
+ m.setName("Thread for merging in-memory files");
+ m.setDaemon(true);
+ mergeInProgress = true;
+ m.start();
+ }
+ } else {
+ // Rename the temporary file to the final file;
+ // ensure it is on the same partition
+ tmpMapOutput = mapOutput.file;
+ filename = new Path(tmpMapOutput.getParent(), filename.getName());
+ if (!localFileSys.rename(tmpMapOutput, filename)) {
+ localFileSys.delete(tmpMapOutput, true);
+ bytes = -1;
+ throw new IOException("Failed to rename map output " +
+ tmpMapOutput + " to " + filename);
+ }
+
+ synchronized (mapOutputFilesOnDisk) {
+ mapOutputFilesOnDisk.add(localFileSys.getFileStatus(filename));
+ }
}
+
+ // Note that we successfully copied the map-output
+ copiedMapOutputs.add(loc.getTaskId());
+ }
+
+ return bytes;
+ }
+
+
+ /**
+ * Get the map output into a local file (either in the inmemory fs or on the
+ * local fs) from the remote server.
+ * We use the file system so that we generate checksum files on the data.
+ * @param mapOutputLoc map-output to be fetched
+ * @param localFilename the filename to write the data into
+ * @param connectionTimeout number of milliseconds for connection timeout
+ * @param readTimeout number of milliseconds for read timeout
+ * @return the path of the file that got created
+ * @throws IOException when something goes wrong
+ */
+ private MapOutput getMapOutput(MapOutputLocation mapOutputLoc,
+ Path localFilename)
+ throws IOException, InterruptedException {
+ boolean good = false;
+ OutputStream output = null;
+ MapOutput mapOutput = null;
+
+ try {
+ URLConnection connection =
+ mapOutputLoc.getOutputLocation().openConnection();
+ InputStream input = getInputStream(connection, DEFAULT_READ_TIMEOUT,
+ STALLED_COPY_TIMEOUT);
+ //We will put a file in memory if it meets certain criteria:
+ //1. The size of the (decompressed) file should be less than 25% of
+ // the total inmem fs
+ //2. There is space available in the inmem fs
+
+ long decompressedLength =
+ Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));
+ long compressedLength =
+ Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
- LOG.info(reduceId + " done copying " + loc.getMapTaskID() +
- " output from " + loc.getHost() + ".");
- //Create a thread to do merges. Synchronize access/update to
- //mergeInProgress
- if (!mergeInProgress &&
- (inMemFileSys.getPercentUsed() >= MAX_INMEM_FILESYS_USE ||
- (mergeThreshold > 0 &&
- inMemFileSys.getNumFiles(MAP_OUTPUT_FILTER) >=
- mergeThreshold))&&
- mergeThrowable == null) {
- LOG.info(reduceId + " InMemoryFileSystem " +
- inMemFileSys.getUri().toString() +
- " is " + inMemFileSys.getPercentUsed() +
- " full. Triggering merge");
- InMemFSMergeThread m = new InMemFSMergeThread(inMemFileSys,
- (LocalFileSystem)localFileSys, sorter);
- m.setName("Thread for merging in memory files");
- m.setDaemon(true);
- mergeInProgress = true;
- m.start();
+ // Check if we can save the map-output in-memory
+ boolean createInMem = ramManager.reserve(decompressedLength);
+ if (createInMem) {
+ LOG.info("Shuffling " + decompressedLength + " bytes (" +
+ compressedLength + " raw bytes) " +
+ "into RAM-FS from " + mapOutputLoc.getTaskAttemptId());
+
+ // Are map-outputs compressed?
+ if (codec != null) {
+ decompressor.reset();
+ input = codec.createInputStream(input, decompressor);
+ }
+
+ output = new DataOutputBuffer((int)decompressedLength);
+ }
+ else {
+ // Find out a suitable location for the output on local-filesystem
+ localFilename = lDirAlloc.getLocalPathForWrite(
+ localFilename.toUri().getPath(), decompressedLength, conf);
+ LOG.info("Shuffling " + decompressedLength + " bytes (" +
+ compressedLength + " raw bytes) " +
+ "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
+ output = localFileSys.create(localFilename);
+ }
+
+ long bytesRead = 0;
+ try {
+ try {
+ byte[] buf = new byte[64 * 1024];
+
+ int n = input.read(buf, 0, buf.length);
+ while (n > 0) {
+ bytesRead += n;
+ shuffleClientMetrics.inputBytes(n);
+ output.write(buf, 0, n);
+
+ // indicate we're making progress
+ reporter.progress();
+ n = input.read(buf, 0, buf.length);
+ }
+
+ LOG.info("Read " + bytesRead + " bytes from map-output " +
+ "for " + mapOutputLoc.getTaskAttemptId());
+
+ mapOutput =
+ (createInMem) ?
+ new MapOutput(mapOutputLoc.getTaskId(),
+ ((DataOutputBuffer)output).getData()) :
+ new MapOutput(mapOutputLoc.getTaskId(), conf,
+ localFileSys.makeQualified(localFilename),
+ compressedLength);
+
+ } finally {
+ output.close();
+ }
+ } finally {
+ input.close();
+ }
+
+ // Sanity check
+ good = createInMem ? (bytesRead == decompressedLength) :
+ (bytesRead == compressedLength);
+ if (!good) {
+ throw new IOException("Incomplete map output received for " +
+ mapOutputLoc.getTaskAttemptId() + " from " +
+ mapOutputLoc.getOutputLocation() + " (" +
+ bytesRead + " instead of " +
+ decompressedLength + ")"
+ );
+ }
+ } finally {
+ if (!good) {
+ try {
+ if (mapOutput != null) {
+ mapOutput.discard();
+ }
+ } catch (Throwable th) {
+ // IGNORED because we are cleaning up
+ }
}
- neededOutputs.remove(loc.getMapId());
}
- // Check if the map output file hits the local file-system by checking
- // their schemes
- String localFSScheme = localFileSys.getUri().getScheme();
- String outputFileScheme = fs.getUri().getScheme();
- if (localFSScheme.equals(outputFileScheme)) {
- synchronized (mapOutputFilesOnDisk) {
- mapOutputFilesOnDisk.add(fs.getFileStatus(filename));
+ return mapOutput;
+ }
+
+ /**
+ * The connection establishment is attempted multiple times and is given up
+ * only on the last failure. Instead of connecting with a timeout of
+ * X, we try connecting with a timeout of x < X but multiple times.
+ */
+ private InputStream getInputStream(URLConnection connection,
+ int connectionTimeout,
+ int readTimeout)
+ throws IOException {
+ int unit = 0;
+ if (connectionTimeout < 0) {
+ throw new IOException("Invalid timeout "
+ + "[timeout = " + connectionTimeout + " ms]");
+ } else if (connectionTimeout > 0) {
+ unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
+ ? connectionTimeout
+ : UNIT_CONNECT_TIMEOUT;
+ }
+ // set the read timeout to the total timeout
+ connection.setReadTimeout(readTimeout);
+ // set the connect timeout to the unit-connect-timeout
+ connection.setConnectTimeout(unit);
+ while (true) {
+ try {
+ return connection.getInputStream();
+ } catch (IOException ioe) {
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
+
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout == 0) {
+ throw ioe;
+ }
+
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
}
}
- return bytes;
}
-
+
}
private void configureClasspath(JobConf conf)
@@ -978,21 +1151,13 @@
this.maxFetchRetriesPerMap = getClosestPowerOf2((this.maxBackoff * 1000
/ BACKOFF_INIT) + 1);
this.mergeThreshold = conf.getInt("mapred.inmem.merge.threshold", 1000);
+
+ // Setup the RamManager
+ ramManager = new RamManager(conf);
+ ramfsMergeOutputSize =
+ (long)(MAX_INMEM_FILESYS_USE * ramManager.getMemoryLimit());
- //we want to distinguish inmem fs instances for different reduces. Hence,
- //append a unique string in the uri for the inmem fs name
- URI uri = URI.create("ramfs://mapoutput" + reduceTask.hashCode());
- inMemFileSys = (InMemoryFileSystem)FileSystem.get(uri, conf);
- LOG.info(reduceTask.getTaskID() + " Created an InMemoryFileSystem, uri: "
- + uri);
- ramfsMergeOutputSize = (long)(MAX_INMEM_FILESYS_USE *
- inMemFileSys.getFSSize());
localFileSys = FileSystem.getLocal(conf);
- //create an instance of the sorter
- sorter =
- new SequenceFile.Sorter(inMemFileSys, conf.getOutputKeyComparator(),
- conf.getMapOutputKeyClass(), conf.getMapOutputValueClass(), conf);
- sorter.setProgressable(getReporter(umbilical));
// hosts -> next contact time
this.penaltyBox = new LinkedHashMap<String, Long>();
@@ -1012,6 +1177,7 @@
this.maxMapRuntime = 0;
}
+ @SuppressWarnings("unchecked")
public boolean fetchOutputs() throws IOException {
final int numOutputs = reduceTask.getNumMaps();
List<MapOutputLocation> knownOutputs =
@@ -1024,24 +1190,18 @@
reduceTask.getProgress().phase();
for (int i = 0; i < numOutputs; i++) {
- neededOutputs.add(i);
copyPhase.addPhase(); // add sub-phase per file
}
- copiers = new MapOutputCopier[numCopiers];
+ copiers = new ArrayList<MapOutputCopier>(numCopiers);
Reporter reporter = getReporter(umbilical);
- // create an instance of the sorter for merging the on-disk files
- SequenceFile.Sorter localFileSystemSorter =
- new SequenceFile.Sorter(localFileSys, conf.getOutputKeyComparator(),
- conf.getMapOutputKeyClass(),
- conf.getMapOutputValueClass(), conf);
- localFileSystemSorter.setProgressable(reporter);
-
+
// start all the copying threads
- for (int i=0; i < copiers.length; i++) {
- copiers[i] = new MapOutputCopier(reporter);
- copiers[i].start();
+ for (int i=0; i < numCopiers; i++) {
+ MapOutputCopier copier = new MapOutputCopier(conf, reporter);
+ copiers.add(copier);
+ copier.start();
}
// start the clock for bandwidth measurement
@@ -1051,9 +1211,8 @@
long lastOutputTime = 0;
IntWritable fromEventId = new IntWritable(0);
- try {
// loop until we get all required outputs
- while (!neededOutputs.isEmpty() && mergeThrowable == null) {
+ while (copiedMapOutputs.size() < numOutputs && mergeThrowable == null) {
currentTime = System.currentTimeMillis();
boolean logNow = false;
@@ -1063,8 +1222,8 @@
}
if (logNow) {
LOG.info(reduceTask.getTaskID() + " Need another "
- + neededOutputs.size() + " map output(s) where "
- + numInFlight + " is already in progress");
+ + (numOutputs - copiedMapOutputs.size()) + " map output(s) "
+ + "where " + numInFlight + " is already in progress");
}
try {
@@ -1129,7 +1288,7 @@
MapOutputLocation loc = locIt.next();
// Do not schedule fetches from OBSOLETE maps
- if (obsoleteMapIds.contains(loc.getMapTaskID())) {
+ if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
locIt.remove();
continue;
}
@@ -1178,9 +1337,11 @@
// make sure that only one thread merges the disk files
localFSMergeInProgress = true;
// start the on-disk-merge process
+ LOG.info(reduceTask.getTaskID() + "We have " +
+ mapOutputFilesOnDisk.size() + " map outputs on disk. " +
+ "Triggering merge of " + ioSortFactor + " files");
LocalFSMerger lfsm =
- new LocalFSMerger((LocalFileSystem)localFileSys,
- localFileSystemSorter);
+ new LocalFSMerger((LocalFileSystem)localFileSys);
lfsm.setName("Thread for merging on-disk files");
lfsm.setDaemon(true);
lfsm.start();
@@ -1221,19 +1382,19 @@
// Note successfull fetch for this mapId to invalidate
// (possibly) old fetch-failures
- fetchFailedMaps.remove(cr.getLocation().getMapId());
+ fetchFailedMaps.remove(cr.getLocation().getTaskId());
} else if (cr.isObsolete()) {
//ignore
LOG.info(reduceTask.getTaskID() +
" Ignoring obsolete copy result for Map Task: " +
- cr.getLocation().getMapTaskID() + " from host: " +
+ cr.getLocation().getTaskAttemptId() + " from host: " +
cr.getHost());
} else {
retryFetches.add(cr.getLocation());
// note the failed-fetch
- TaskAttemptID mapTaskId = cr.getLocation().getMapTaskID();
- Integer mapId = cr.getLocation().getMapId();
+ TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
+ TaskID mapId = cr.getLocation().getTaskId();
totalFailures++;
Integer noFailedFetches =
@@ -1351,10 +1512,10 @@
// all done, inform the copiers to exit
synchronized (copiers) {
synchronized (scheduledCopies) {
- for (int i=0; i < copiers.length; i++) {
- copiers[i].interrupt();
- copiers[i] = null;
+ for (MapOutputCopier copier : copiers) {
+ copier.interrupt();
}
+ copiers.clear();
}
}
@@ -1372,52 +1533,63 @@
}
LOG.info(reduceTask.getTaskID() +
" Copying of all map outputs complete. " +
- "Initiating the last merge on the remaining files in " +
- inMemFileSys.getUri());
+ "Initiating the last merge on the remaining files " +
+ "in-memory");
if (mergeThrowable != null) {
//this could happen if the merge that
//was in progress threw an exception
throw mergeThrowable;
}
//initiate merge
- Path[] inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
- if (inMemClosedFiles.length == 0) {
- LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
- inMemFileSys.getUri());
- return neededOutputs.isEmpty();
+ if (mapOutputsFilesInMemory.size() == 0) {
+ LOG.info(reduceTask.getTaskID() + "Nothing to merge from " +
+ "in-memory map-outputs");
+ return (copiedMapOutputs.size() == numOutputs);
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to be
//absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
- //is called (we delete empty sequence files as soon as we see them
+ //is called (we delete empty map-output files as soon as we see them
//in the merge method)
- int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
- Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
- reduceTask.getTaskID(), ramfsMergeOutputSize);
- SequenceFile.Writer writer = sorter.cloneFileAttributes(
- inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(outputPath), null);
-
- SequenceFile.Sorter.RawKeyValueIterator rIter = null;
+ TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
+ Path outputPath =
+ localFileSys.makeQualified(
+ mapOutputFile.getInputFileForWrite(mapId,
+ reduceTask.getTaskID(),
+ ramfsMergeOutputSize));
+ Writer writer =
+ new Writer(conf, localFileSys, outputPath,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec);
+ List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+ int noInMemSegments = inMemorySegments.size();
+ RawKeyValueIterator rIter = null;
try {
- rIter = sorter.merge(inMemClosedFiles, true,
- inMemClosedFiles.length,
- new Path(reduceTask.getTaskID().toString()));
+ rIter = Merger.merge(conf, localFileSys,
+ (Class<K>)conf.getMapOutputKeyClass(),
+ (Class<V>)conf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(reduceTask.getTaskID().toString()),
+ conf.getOutputKeyComparator(), reporter);
+
+ Merger.writeFile(rIter, writer, reporter);
+ writer.close();
} catch (Exception e) {
//make sure that we delete the ondisk file that we created earlier
//when we invoked cloneFileAttributes
writer.close();
- localFileSys.delete(inMemClosedFiles[0], true);
+ localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
}
- sorter.writeFile(rIter, writer);
- writer.close();
LOG.info(reduceTask.getTaskID() +
- " Merge of the " +inMemClosedFiles.length +
+ " Merge of the " + noInMemSegments +
" files in InMemoryFileSystem complete." +
- " Local file is " + outputPath);
+ " Local file is " + outputPath +
+ " of size " +
+ localFileSys.getFileStatus(outputPath).getLen());
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
@@ -1434,12 +1606,26 @@
return false;
}
}
- return mergeThrowable == null && neededOutputs.isEmpty();
- } finally {
- inMemFileSys.close();
- }
+ return mergeThrowable == null && copiedMapOutputs.size() == numOutputs;
}
+ private List<Segment<K, V>> createInMemorySegments() {
+ List<Segment<K, V>> inMemorySegments =
+ new LinkedList<Segment<K, V>>();
+ synchronized (mapOutputsFilesInMemory) {
+ while(mapOutputsFilesInMemory.size() > 0) {
+ MapOutput mo = mapOutputsFilesInMemory.remove(0);
+
+ Reader<K, V> reader =
+ new InMemoryReader<K, V>(ramManager,
+ mo.data, 0, mo.data.length);
+ Segment<K, V> segment =
+ new Segment<K, V>(reader, true);
+ inMemorySegments.add(segment);
+ }
+ }
+ return inMemorySegments;
+ }
private CopyResult getCopyResult() {
synchronized (copyResults) {
@@ -1501,9 +1687,7 @@
{
URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
- int port = u.getPort();
- TaskAttemptID taskId = event.getTaskID();
- int mId = event.idWithinJob();
+ TaskAttemptID taskId = event.getTaskAttemptId();
int duration = event.getTaskRunTime();
if (duration > maxMapRuntime) {
maxMapRuntime = duration;
@@ -1511,23 +1695,28 @@
maxFetchRetriesPerMap =
getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
}
- knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
+ URL mapOutputLocation = new URL(event.getTaskTrackerHttp() +
+ "/mapOutput?job=" + taskId.getJobID() +
+ "&map=" + taskId +
+ "&reduce=" + getPartition());
+ knownOutputs.add(new MapOutputLocation(taskId, host,
+ mapOutputLocation));
}
break;
case FAILED:
case KILLED:
case OBSOLETE:
{
- obsoleteMapIds.add(event.getTaskID());
+ obsoleteMapIds.add(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
- " map-task: '" + event.getTaskID() + "'");
+ " map-task: '" + event.getTaskAttemptId() + "'");
}
break;
case TIPFAILED:
{
- neededOutputs.remove(event.idWithinJob());
+ copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
LOG.info("Ignoring output of failed map TIP: '" +
- event.getTaskID() + "'");
+ event.getTaskAttemptId() + "'");
}
break;
}
@@ -1542,17 +1731,15 @@
*/
private class LocalFSMerger extends Thread {
private LocalFileSystem localFileSys;
- private SequenceFile.Sorter sorter;
- public LocalFSMerger(LocalFileSystem fs, SequenceFile.Sorter sorter) {
+ public LocalFSMerger(LocalFileSystem fs) {
this.localFileSys = fs;
- this.sorter = sorter;
}
- @Override
+ @SuppressWarnings("unchecked")
public void run() {
try {
- Path[] mapFiles = new Path[ioSortFactor];
+ List<Path> mapFiles = new ArrayList<Path>();
long approxOutputSize = 0;
int bytesPerSum =
reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
@@ -1565,10 +1752,16 @@
for (int i = 0; i < ioSortFactor; ++i) {
FileStatus filestatus = mapOutputFilesOnDisk.first();
mapOutputFilesOnDisk.remove(filestatus);
- mapFiles[i] = filestatus.getPath();
+ mapFiles.add(filestatus.getPath());
approxOutputSize += filestatus.getLen();
}
}
+
+ // sanity check
+ if (mapFiles.size() == 0) {
+ return;
+ }
+
// add the checksum length
approxOutputSize += ChecksumFileSystem
.getChecksumLength(approxOutputSize,
@@ -1576,29 +1769,42 @@
// 2. Start the on-disk merge process
Path outputPath =
- lDirAlloc.getLocalPathForWrite(mapFiles[0].toString(),
+ lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(),
approxOutputSize, conf)
.suffix(".merged");
- SequenceFile.Writer writer =
- sorter.cloneFileAttributes(mapFiles[0], outputPath, null);
- SequenceFile.Sorter.RawKeyValueIterator iter = null;
+ Writer writer =
+ new Writer(conf, localFileSys, outputPath,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec);
+ RawKeyValueIterator iter = null;
Path tmpDir = new Path(reduceTask.getTaskID().toString());
+ final Reporter reporter = getReporter(umbilical);
try {
- iter = sorter.merge(mapFiles, true, ioSortFactor, tmpDir);
+ iter = Merger.merge(conf, localFileSys,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec, mapFiles.toArray(new Path[mapFiles.size()]),
+ true, ioSortFactor, tmpDir,
+ conf.getOutputKeyComparator(), reporter);
} catch (Exception e) {
writer.close();
localFileSys.delete(outputPath, true);
throw new IOException (StringUtils.stringifyException(e));
}
- sorter.writeFile(iter, writer);
+ Merger.writeFile(iter, writer, reporter);
writer.close();
synchronized (mapOutputFilesOnDisk) {
mapOutputFilesOnDisk.add(localFileSys.getFileStatus(outputPath));
}
- LOG.info(reduceTask.getTaskID()
- + " Finished merging map output files on disk.");
+ LOG.info(reduceTask.getTaskID() +
+ " Finished merging " + mapFiles.size() +
+ " map output files on disk of total-size " +
+ approxOutputSize + "." +
+ " Local output file is " + outputPath + " of size " +
+ localFileSys.getFileStatus(outputPath).getLen());
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID()
+ " Merging of the local FS files threw an exception: "
@@ -1613,54 +1819,52 @@
}
private class InMemFSMergeThread extends Thread {
- private InMemoryFileSystem inMemFileSys;
private LocalFileSystem localFileSys;
- private SequenceFile.Sorter sorter;
- public InMemFSMergeThread(InMemoryFileSystem inMemFileSys,
- LocalFileSystem localFileSys, SequenceFile.Sorter sorter) {
- this.inMemFileSys = inMemFileSys;
+ public InMemFSMergeThread( LocalFileSystem localFileSys) {
this.localFileSys = localFileSys;
- this.sorter = sorter;
}
- @Override
+
+ @SuppressWarnings("unchecked")
public void run() {
LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
try {
- Path[] inMemClosedFiles;
- //initiate merge
- synchronized (ReduceTask.this) {
- inMemClosedFiles = inMemFileSys.getFiles(MAP_OUTPUT_FILTER);
- }
- //Note that the above Path[] could be of length 0 if all copies are
- //in flight. So we make sure that we have some 'closed' map
- //output files to merge to get the benefit of in-memory merge
- if (inMemClosedFiles.length >=
- (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
+ if (mapOutputsFilesInMemory.size() >=
+ (int)(MAX_INMEM_FILESYS_USE/MAX_INMEM_FILESIZE_FRACTION)) {
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to
//be absent on the disk currently. So we don't overwrite a prev.
//created spill). Also we need to create the output file now since
//it is not guaranteed that this file will be present after merge
- //is called (we delete empty sequence files as soon as we see them
+ //is called (we delete empty files as soon as we see them
//in the merge method)
//figure out the mapId
- int mapId = extractMapIdFromPathName(inMemClosedFiles[0]);
+ TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
reduceTask.getTaskID(), ramfsMergeOutputSize);
- SequenceFile.Writer writer = sorter.cloneFileAttributes(
- inMemFileSys.makeQualified(inMemClosedFiles[0]),
- localFileSys.makeQualified(outputPath), null);
- SequenceFile.Sorter.RawKeyValueIterator rIter;
+ Writer writer =
+ new Writer(conf, localFileSys, outputPath,
+ conf.getMapOutputKeyClass(),
+ conf.getMapOutputValueClass(),
+ codec);
+
+ List<Segment<K, V>> inMemorySegments = createInMemorySegments();
+ int noInMemorySegments = inMemorySegments.size();
+
+ RawKeyValueIterator rIter = null;
+ final Reporter reporter = getReporter(umbilical);
try {
- rIter = sorter.merge(inMemClosedFiles, true,
- inMemClosedFiles.length,
- new Path(reduceTask.getTaskID().toString()));
+ rIter = Merger.merge(conf, localFileSys,
+ (Class<K>)conf.getMapOutputKeyClass(),
+ (Class<V>)conf.getMapOutputValueClass(),
+ inMemorySegments, inMemorySegments.size(),
+ new Path(reduceTask.getTaskID().toString()),
+ conf.getOutputKeyComparator(), reporter);
if (null == combinerClass) {
- sorter.writeFile(rIter, writer);
+ Merger.writeFile(rIter, writer, reporter);
} else {
combineCollector.setWriter(writer);
combineAndSpill(rIter, reduceCombineInputCounter);
@@ -1675,9 +1879,10 @@
}
writer.close();
LOG.info(reduceTask.getTaskID() +
- " Merge of the " +inMemClosedFiles.length +
- " files in InMemoryFileSystem complete." +
- " Local file is " + outputPath);
+ " Merge of the " + noInMemorySegments +
+ " files in-memory complete." +
+ " Local file is " + outputPath + " of size " +
+ localFileSys.getFileStatus(outputPath).getLen());
FileStatus status = localFileSys.getFileStatus(outputPath);
synchronized (mapOutputFilesOnDisk) {
@@ -1685,8 +1890,7 @@
}
}
else {
- LOG.info(reduceTask.getTaskID() + " Nothing to merge from " +
- inMemFileSys.getUri());
+ LOG.info(reduceTask.getTaskID() + " Nothing to merge from map-outputs in-memory");
}
} catch (Throwable t) {
LOG.warn(reduceTask.getTaskID() +
@@ -1699,15 +1903,10 @@
}
}
}
- final private PathFilter MAP_OUTPUT_FILTER = new PathFilter() {
- public boolean accept(Path file) {
- return file.toString().endsWith(".out");
- }
- };
@SuppressWarnings("unchecked")
private void combineAndSpill(
- SequenceFile.Sorter.RawKeyValueIterator kvIter,
+ RawKeyValueIterator kvIter,
Counters.Counter inCounter) throws IOException {
JobConf job = (JobConf)getConf();
Reducer combiner =
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Jun 4 21:06:13 2008
@@ -24,7 +24,9 @@
import java.net.URI;
import java.text.NumberFormat;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -40,13 +42,16 @@
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.kfs.KosmosFileSystem;
import org.apache.hadoop.fs.s3.S3FileSystem;
+import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.IFile.Writer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -602,28 +607,141 @@
/**
* OutputCollector for the combiner.
*/
- protected static class CombineOutputCollector implements OutputCollector {
- private SequenceFile.Writer writer;
+ protected static class CombineOutputCollector<K extends Object, V extends Object>
+ implements OutputCollector<K, V> {
+ private Writer<K, V> writer;
private Counters.Counter outCounter;
public CombineOutputCollector(Counters.Counter outCounter) {
this.outCounter = outCounter;
}
- public synchronized void setWriter(SequenceFile.Writer writer) {
+ public synchronized void setWriter(Writer<K, V> writer) {
this.writer = writer;
}
- public synchronized void collect(Object key, Object value)
+ public synchronized void collect(K key, V value)
throws IOException {
outCounter.increment(1);
writer.append(key, value);
}
}
+ /** Iterates values while keys match in sorted input. */
+ static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
+ protected RawKeyValueIterator in; //input iterator
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
+ private boolean hasNext; // more w/ this key
+ private boolean more; // more in file
+ private RawComparator<KEY> comparator;
+ protected Progressable reporter;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
+ private DataInputBuffer keyIn = new DataInputBuffer();
+ private DataInputBuffer valueIn = new DataInputBuffer();
+
+ @SuppressWarnings("unchecked")
+ public ValuesIterator (RawKeyValueIterator in,
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ Progressable reporter)
+ throws IOException {
+ this.in = in;
+ this.comparator = comparator;
+ this.reporter = reporter;
+ SerializationFactory serializationFactory = new SerializationFactory(conf);
+ this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+ this.keyDeserializer.open(keyIn);
+ this.valDeserializer = serializationFactory.getDeserializer(valClass);
+ this.valDeserializer.open(this.valueIn);
+ readNextKey();
+ key = nextKey;
+ nextKey = null; // force new instance creation
+ hasNext = more;
+ }
+
+ RawKeyValueIterator getRawIterator() { return in; }
+
+ /// Iterator methods
+
+ public boolean hasNext() { return hasNext; }
+
+ private int ctr = 0;
+ public VALUE next() {
+ if (!hasNext) {
+ throw new NoSuchElementException("iterate past last value");
+ }
+ try {
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing post rec#"+ctr, ie);
+ }
+ reporter.progress();
+ return value;
+ }
+
+ public void remove() { throw new RuntimeException("not implemented"); }
+
+ /// Auxiliary methods
+
+ /** Start processing next unique key. */
+ void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasNext) {
+ readNextKey();
+ }
+ ++ctr;
+
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
+ hasNext = more;
+ }
+
+ /** True iff more keys remain. */
+ boolean more() {
+ return more;
+ }
+
+ /** The current key. */
+ KEY getKey() {
+ return key;
+ }
+
+ /**
+ * read the next key
+ */
+ private void readNextKey() throws IOException {
+ more = in.next();
+ if (more) {
+ DataInputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+ nextKey = keyDeserializer.deserialize(nextKey);
+ hasNext = key != null && (comparator.compare(key, nextKey) == 0);
+ } else {
+ hasNext = false;
+ }
+ }
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ DataInputBuffer nextValueBytes = in.getValue();
+ valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+ value = valDeserializer.deserialize(value);
+ }
+ }
+
protected static class CombineValuesIterator<KEY,VALUE>
extends ValuesIterator<KEY,VALUE> {
private final Counters.Counter combineInputCounter;
- public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+ public CombineValuesIterator(RawKeyValueIterator in,
RawComparator<KEY> comparator, Class<KEY> keyClass,
Class<VALUE> valClass, Configuration conf, Reporter reporter,
Counters.Counter combineInputCounter) throws IOException {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Wed Jun 4 21:06:13 2008
@@ -79,7 +79,7 @@
/**
* Returns task id.
* @return task id
- * @deprecated use {@link #getTaskID()} instead.
+ * @deprecated use {@link #getTaskAttemptId()} instead.
*/
@Deprecated
public String getTaskId() {
@@ -90,7 +90,7 @@
* Returns task id.
* @return task id
*/
- public TaskAttemptID getTaskID() {
+ public TaskAttemptID getTaskAttemptId() {
return taskId;
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 4 21:06:13 2008
@@ -2346,18 +2346,24 @@
indexIn = fileSys.open(indexFileName);
//seek to the correct offset for the given reduce
- indexIn.seek(reduce * 16);
+ indexIn.seek(reduce * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH);
//read the offset and length of the partition data
long startOffset = indexIn.readLong();
+ long rawPartLength = indexIn.readLong();
long partLength = indexIn.readLong();
indexIn.close();
indexIn = null;
+ //set the custom "Raw-Map-Output-Length" http header to
+ //the raw (decompressed) length
+ response.setHeader(RAW_MAP_OUTPUT_LENGTH, Long.toString(rawPartLength));
+
//set the custom "Map-Output-Length" http header to
//the actual number of bytes being transferred
- response.setHeader(MAP_OUTPUT_LENGTH, Long.toString(partLength));
+ response.setHeader(MAP_OUTPUT_LENGTH,
+ Long.toString(partLength));
//use the same buffersize as used for reading the data from disk
response.setBufferSize(MAX_BYTES_TO_READ);
@@ -2390,6 +2396,10 @@
(partLength - totalRead) < MAX_BYTES_TO_READ
? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
}
+
+ LOG.info("Sent out " + totalRead + " bytes for reduce: " + reduce +
+ " from map: " + mapId + " given " + partLength + "/" +
+ rawPartLength);
} catch (IOException ie) {
TaskTracker tracker =
(TaskTracker) context.getAttribute("task.tracker");
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java Wed Jun 4 21:06:13 2008
@@ -171,6 +171,17 @@
}
}
+ /**
+ * Return the correctly-typed {@link Class} of the given object.
+ *
+ * @param o object whose correctly-typed <code>Class</code> is to be obtained
+ * @return the correctly typed <code>Class</code> of the given object.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> Class<T> getClass(T o) {
+ return (Class<T>)o.getClass();
+ }
+
// methods to support testing
static void clearCache() {
CONSTRUCTOR_CACHE.clear();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/io/compress/TestCodecFactory.java Wed Jun 4 21:06:13 2008
@@ -45,7 +45,7 @@
return null;
}
- public Class getCompressorType() {
+ public Class<? extends Compressor> getCompressorType() {
return null;
}
@@ -70,7 +70,7 @@
return null;
}
- public Class getDecompressorType() {
+ public Class<? extends Decompressor> getDecompressorType() {
return null;
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=663440&r1=663439&r2=663440&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Wed Jun 4 21:06:13 2008
@@ -98,7 +98,7 @@
TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
assertEquals(events0.length, events1.length);
for (int i = 0; i < events0.length; i++) {
- assertEquals(events0[i].getTaskID(), events1[i].getTaskID());
+ assertEquals(events0[i].getTaskAttemptId(), events1[i].getTaskAttemptId());
assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
}
}