You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/08 02:53:09 UTC
svn commit: r483772 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/io/ src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/mapred/
Author: cutting
Date: Thu Dec 7 17:53:07 2006
New Revision: 483772
URL: http://svn.apache.org/viewvc?view=rev&rev=483772
Log:
HADOOP-331. Write all map outputs to a single file with an index, rather than to a separate file per reduce task. Contributed by Devaraj.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/build.xml
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 17:53:07 2006
@@ -40,6 +40,11 @@
jobs that were running when it was last stopped.
(Sanjay Dahiya via cutting)
+12. HADOOP-331. Write all map outputs to a single file with an index,
+ rather than to a separate file per reduce task. This should both
+ speed the shuffle and make things more scalable.
+ (Devaraj Das via cutting)
+
Release 0.9.1 - 2006-12-06
Modified: lucene/hadoop/trunk/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/build.xml?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/build.xml (original)
+++ lucene/hadoop/trunk/build.xml Thu Dec 7 17:53:07 2006
@@ -366,7 +366,7 @@
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
<junit showoutput="${test.output}" printsummary="yes" haltonfailure="no"
- fork="yes" dir="${basedir}"
+ fork="yes" maxmemory="128m" dir="${basedir}"
errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="test.build.data" value="${test.build.data}"/>
<sysproperty key="hadoop.log.dir" value="${test.log.dir}"/>
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Thu Dec 7 17:53:07 2006
@@ -433,13 +433,6 @@
</property>
<property>
- <name>mapred.combine.buffer.size</name>
- <value>100000</value>
- <description>The number of entries the combining collector caches before
- combining them and writing to disk.</description>
-</property>
-
-<property>
<name>mapred.speculative.execution</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks may
@@ -572,6 +565,13 @@
<name>io.seqfile.compression.type</name>
<value>RECORD</value>
<description>The default compression type for SequenceFile.Writer.
+ </description>
+</property>
+
+<property>
+ <name>map.sort.class</name>
+ <value>org.apache.hadoop.mapred.MergeSorter</value>
+ <description>The default sort class for sorting keys.
</description>
</property>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/SequenceFile.java Thu Dec 7 17:53:07 2006
@@ -37,6 +37,7 @@
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.hadoop.util.MergeSort;
/** Support for flat files of binary key/value pairs. */
public class SequenceFile {
@@ -284,6 +285,42 @@
return writer;
}
+ /**
+ * Construct the preferred type of 'raw' SequenceFile Writer.
+ * @param conf The configuration.
+ * @param out The stream on top which the writer is to be constructed.
+ * @param keyClass The 'key' type.
+ * @param valClass The 'value' type.
+ * @param compressionType The compression type.
+ * @param codec The compression codec.
+ * @return Returns the handle to the constructed SequenceFile Writer.
+ * @throws IOException
+ */
+ public static Writer
+ createWriter(Configuration conf, FSDataOutputStream out,
+ Class keyClass, Class valClass, CompressionType compressionType,
+ CompressionCodec codec)
+ throws IOException {
+ if ((codec instanceof GzipCodec) &&
+ !NativeCodeLoader.isNativeCodeLoaded() &&
+ !ZlibFactory.isNativeZlibLoaded()) {
+ throw new IllegalArgumentException("SequenceFile doesn't work with " +
+ "GzipCodec without native-hadoop code!");
+ }
+
+ Writer writer = null;
+
+ if (compressionType == CompressionType.NONE) {
+ writer = new Writer(conf, out, keyClass, valClass);
+ } else if (compressionType == CompressionType.RECORD) {
+ writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec);
+ } else if (compressionType == CompressionType.BLOCK){
+ writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec);
+ }
+
+ return writer;
+ }
+
/** The interface to 'raw' values of SequenceFiles. */
public static interface ValueBytes {
@@ -505,6 +542,10 @@
/** Returns the compression codec of data in this file. */
public CompressionCodec getCompressionCodec() { return codec; }
+
+ /** create a sync point */
+ public void sync() throws IOException {
+ }
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
@@ -686,6 +727,10 @@
val.writeCompressedBytes(out); // 'value' data
}
+
+ public void sync() throws IOException {
+ }
+
} // RecordCompressionWriter
/** Write compressed key/value blocks to a sequence-format file. */
@@ -804,6 +849,10 @@
}
}
+ public void sync() throws IOException {
+ writeBlock();
+ }
+
/** Append a key/value pair. */
public synchronized void append(Writable key, Writable val)
throws IOException {
@@ -1508,6 +1557,8 @@
private WritableComparator comparator;
+ private MergeSort mergeSort; //the implementation of merge sort
+
private Path[] inFiles; // when merging or sorting
private Path outFile;
@@ -1612,6 +1663,7 @@
private int sortPass(boolean deleteInput) throws IOException {
LOG.debug("running sort pass");
SortPass sortPass = new SortPass(); // make the SortPass
+ mergeSort = new MergeSort(sortPass.new SeqFileComparator());
try {
return sortPass.run(deleteInput); // run it
} finally {
@@ -1775,11 +1827,7 @@
int p = pointers[i];
writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
}
- if (writer instanceof SequenceFile.BlockCompressWriter) {
- SequenceFile.BlockCompressWriter bcWriter =
- (SequenceFile.BlockCompressWriter) writer;
- bcWriter.writeBlock();
- }
+ writer.sync();
writer.out.flush();
@@ -1793,50 +1841,14 @@
private void sort(int count) {
System.arraycopy(pointers, 0, pointersCopy, 0, count);
- mergeSort(pointersCopy, pointers, 0, count);
+ mergeSort.mergeSort(pointersCopy, pointers, 0, count);
}
-
- private int compare(int i, int j) {
- return comparator.compare(rawBuffer, keyOffsets[i], keyLengths[i],
- rawBuffer, keyOffsets[j], keyLengths[j]);
- }
-
- private void mergeSort(int src[], int dest[], int low, int high) {
- int length = high - low;
-
- // Insertion sort on smallest arrays
- if (length < 7) {
- for (int i=low; i<high; i++)
- for (int j=i; j>low && compare(dest[j-1], dest[j])>0; j--)
- swap(dest, j, j-1);
- return;
- }
-
- // Recursively sort halves of dest into src
- int mid = (low + high) >> 1;
- mergeSort(dest, src, low, mid);
- mergeSort(dest, src, mid, high);
-
- // If list is already sorted, just copy from src to dest. This is an
- // optimization that results in faster sorts for nearly ordered lists.
- if (compare(src[mid-1], src[mid]) <= 0) {
- System.arraycopy(src, low, dest, low, length);
- return;
+ class SeqFileComparator implements Comparator<IntWritable> {
+ public int compare(IntWritable I, IntWritable J) {
+ return comparator.compare(rawBuffer, keyOffsets[I.get()],
+ keyLengths[I.get()], rawBuffer,
+ keyOffsets[J.get()], keyLengths[J.get()]);
}
-
- // Merge sorted halves (now in src) into dest
- for (int i = low, p = low, q = mid; i < high; i++) {
- if (q>=high || p<mid && compare(src[p], src[q]) <= 0)
- dest[i] = src[p++];
- else
- dest[i] = src[q++];
- }
- }
-
- private void swap(int x[], int a, int b) {
- int t = x[a];
- x[a] = x[b];
- x[b] = t;
}
} // SequenceFile.Sorter.SortPass
@@ -1898,7 +1910,36 @@
s.doSync();
a.add(s);
}
- factor = inNames.length;
+ factor = (inNames.length < factor) ? inNames.length : factor;
+ MergeQueue mQueue = new MergeQueue(a);
+ return mQueue.merge();
+ }
+
+ /**
+ * Merges the contents of files passed in Path[]
+ * @param inNames the array of path names
+ * @param tempDir the directory for creating temp files during merge
+ * @param deleteInputs true if the input files should be deleted when
+ * unnecessary
+ * @return RawKeyValueIteratorMergeQueue
+ * @throws IOException
+ */
+ public RawKeyValueIterator merge(Path [] inNames, Path tempDir,
+ boolean deleteInputs)
+ throws IOException {
+ //outFile will basically be used as prefix for temp files for the
+ //intermediate merge outputs
+ this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
+ //get the segments from inNames
+ ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
+ for (int i = 0; i < inNames.length; i++) {
+ SegmentDescriptor s = new SegmentDescriptor(0,
+ fs.getLength(inNames[i]), inNames[i]);
+ s.preserveInput(!deleteInputs);
+ s.doSync();
+ a.add(s);
+ }
+ factor = (inNames.length < factor) ? inNames.length : factor;
MergeQueue mQueue = new MergeQueue(a);
return mQueue.merge();
}
@@ -1916,7 +1957,7 @@
*/
public Writer cloneFileAttributes(FileSystem fileSys, Path inputFile,
Path outputFile, Progressable prog) throws IOException {
- Reader reader = new Reader(fileSys, inputFile, memory/(factor+1), conf);
+ Reader reader = new Reader(fileSys, inputFile, 4096, conf);
boolean compress = reader.isCompressed();
boolean blockCompress = reader.isBlockCompressed();
CompressionCodec codec = reader.getCompressionCodec();
@@ -1944,11 +1985,7 @@
writer.appendRaw(records.getKey().getData(), 0,
records.getKey().getLength(), records.getValue());
}
- if (writer instanceof SequenceFile.BlockCompressWriter) {
- SequenceFile.BlockCompressWriter bcWriter =
- (SequenceFile.BlockCompressWriter) writer;
- bcWriter.writeBlock();
- }
+ writer.sync();
}
/** Merge the provided files.
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java Thu Dec 7 17:53:07 2006
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.ValueBytes;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+
+/** This class implements the sort interface using primitive int arrays as
+ * the data structures (that is why this class is called 'BasicType'SorterBase)
+ * @author ddas
+ */
+abstract class BasicTypeSorterBase implements BufferSorter {
+
+ protected DataOutputBuffer keyValBuffer; //the buffer used for storing
+ //key/values
+ protected int[] startOffsets; //the array used to store the start offsets of
+ //keys in keyValBuffer
+ protected int[] keyLengths; //the array used to store the lengths of
+ //keys
+ protected int[] valueLengths; //the array used to store the value lengths
+ protected int[] pointers; //the array of startOffsets's indices. This will
+ //be sorted at the end to contain a sorted array of
+ //indices to offsets
+ protected WritableComparator comparator; //the comparator for the map output
+ protected int count; //the number of key/values
+ //the overhead of the arrays in memory
+ //12 => 4 for keyoffsets, 4 for keylengths, 4 for valueLengths, and
+ //4 for indices into startOffsets array in the
+ //pointers array (ignored the partpointers list itself)
+ private final int BUFFERED_KEY_VAL_OVERHEAD = 16;
+
+ //Implementation of methods of the SorterBase interface
+ //
+ public void configure(JobConf conf) {
+ startOffsets = new int[1024];
+ keyLengths = new int[1024];
+ valueLengths = new int[1024];
+ pointers = new int[1024];
+ comparator = conf.getOutputKeyComparator();
+ }
+
+ public void addKeyValue(int recordOffset, int keyLength, int valLength) {
+ //Add the start offset of the key in the startOffsets array and the
+ //length in the keyLengths array.
+ if (count == startOffsets.length)
+ grow();
+ startOffsets[count] = recordOffset;
+ keyLengths[count] = keyLength;
+ valueLengths[count] = valLength;
+ pointers[count] = count;
+ count++;
+ }
+
+ public void setInputBuffer(DataOutputBuffer buffer) {
+ //store a reference to the keyValBuffer that we need to read during sort
+ this.keyValBuffer = buffer;
+ }
+
+ public long getMemoryUtilized() {
+ return (startOffsets.length) * BUFFERED_KEY_VAL_OVERHEAD;
+ }
+
+ public abstract RawKeyValueIterator sort();
+
+ public void close() {
+ //just set count to 0; we reuse the arrays
+ count = 0;
+ }
+ //A compare method that references the keyValBuffer through the indirect
+ //pointers
+ protected int compare(int i, int j) {
+ return comparator.compare(keyValBuffer.getData(), startOffsets[i],
+ keyLengths[i],
+ keyValBuffer.getData(), startOffsets[j],
+ keyLengths[j]);
+ }
+
+ private void grow() {
+ int newLength = startOffsets.length * 3/2;
+ startOffsets = grow(startOffsets, newLength);
+ keyLengths = grow(keyLengths, newLength);
+ valueLengths = grow(valueLengths, newLength);
+ pointers = grow(pointers, newLength);
+ }
+
+ private int[] grow(int[] old, int newLength) {
+ int[] result = new int[newLength];
+ System.arraycopy(old, 0, result, 0, old.length);
+ return result;
+ }
+} //BasicTypeSorterBase
+
+//Implementation of methods of the RawKeyValueIterator interface. These
+//methods must be invoked to iterate over key/vals after sort is done.
+//
+class MRSortResultIterator implements RawKeyValueIterator {
+
+ private int count;
+ private int[] pointers;
+ private int[] startOffsets;
+ private int[] keyLengths;
+ private int[] valLengths;
+ private int currStartOffsetIndex;
+ private int currIndexInPointers;
+ private DataOutputBuffer keyValBuffer;
+ private DataOutputBuffer key = new DataOutputBuffer();
+ private InMemUncompressedBytes value = new InMemUncompressedBytes();
+
+ public MRSortResultIterator(DataOutputBuffer keyValBuffer,
+ int []pointers, int []startOffsets,
+ int []keyLengths, int []valLengths) {
+ this.count = pointers.length;
+ this.pointers = pointers;
+ this.startOffsets = startOffsets;
+ this.keyLengths = keyLengths;
+ this.valLengths = valLengths;
+ this.keyValBuffer = keyValBuffer;
+ }
+
+ public Progress getProgress() {
+ return null;
+ }
+
+ public DataOutputBuffer getKey() throws IOException {
+ int currKeyOffset = startOffsets[currStartOffsetIndex];
+ int currKeyLength = keyLengths[currStartOffsetIndex];
+ //reuse the same key
+ key.reset();
+ key.write(keyValBuffer.getData(), currKeyOffset, currKeyLength);
+ return key;
+ }
+
+ public ValueBytes getValue() throws IOException {
+ //value[i] is stored in the following byte range:
+ //startOffsets[i] + keyLengths[i] through valLengths[i]
+ value.reset(keyValBuffer,
+ startOffsets[currStartOffsetIndex] + keyLengths[currStartOffsetIndex],
+ valLengths[currStartOffsetIndex]);
+ return value;
+ }
+
+ public boolean next() throws IOException {
+ if (count == currIndexInPointers)
+ return false;
+ currStartOffsetIndex = pointers[currIndexInPointers];
+ currIndexInPointers++;
+ return true;
+ }
+
+ public void close() {
+ return;
+ }
+
+ //An implementation of the ValueBytes interface for the in-memory value
+ //buffers.
+ private class InMemUncompressedBytes implements ValueBytes {
+ private byte[] data;
+ int start;
+ int dataSize;
+ private void reset(DataOutputBuffer d, int start, int length)
+ throws IOException {
+ data = d.getData();
+ this.start = start;
+ dataSize = length;
+ }
+
+ public int getSize() throws IOException {
+ return dataSize;
+ }
+
+ public void writeUncompressedBytes(DataOutputStream outStream)
+ throws IOException {
+ outStream.write(data, start, dataSize);
+ }
+
+ public void writeCompressedBytes(DataOutputStream outStream)
+ throws IllegalArgumentException, IOException {
+ throw
+ new IllegalArgumentException("UncompressedBytes cannot be compressed!");
+ }
+
+ } // InMemUncompressedBytes
+
+} //MRSortResultIterator
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/BufferSorter.java Thu Dec 7 17:53:07 2006
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+/** This class provides a generic sort interface that should be implemented
+ * by specific sort algorithms. The use case is the following:
+ * A user class writes key/value records to a buffer, and finally wants to
+ * sort the buffer. This interface defines methods by which the user class
+ * can update the interface implementation with the offsets of the records
+ * and the lengths of the keys/values. The user class gives a reference to
+ * the buffer when the latter wishes to sort the records written to the buffer
+ * so far. Typically, the user class decides the point at which sort should
+ * happen based on the memory consumed so far by the buffer and the data
+ * structures maintained by an implementation of this interface. That is why
+ * a method is provided to get the memory consumed so far by the datastructures
+ * in the interface implementation.
+ * @author ddas
+ *
+ */
+interface BufferSorter extends JobConfigurable {
+
+ /** When a key/value is added at a particular offset in the key/value buffer,
+ * this method is invoked by the user class so that the impl of this sort
+ * interface can update its datastructures.
+ * @param recordOffset the offset of the key in the buffer
+ * @param keyLength the length of the key
+ * @param valLength the length of the val in the buffer
+ */
+ public void addKeyValue(int recordoffset, int keyLength, int valLength);
+
+ /** The user class invokes this method to set the buffer that the specific
+ * sort algorithm should "indirectly" sort (generally, sort algorithm impl
+ * should access this buffer via comparators and sort offset-indices to the
+ * buffer).
+ * @param buffer the map output buffer
+ */
+ public void setInputBuffer(DataOutputBuffer buffer);
+
+ /** The framework invokes this method to get the memory consumed so far
+ * by an implementation of this interface.
+ * @return memoryUsed in bytes
+ */
+ public long getMemoryUtilized();
+
+ /** Framework decides when to actually sort
+ */
+ public RawKeyValueIterator sort();
+
+ /** Framework invokes this to signal the sorter to cleanup
+ */
+ public void close();
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/CombiningCollector.java Thu Dec 7 17:53:07 2006
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred;
-
-import java.io.*;
-import java.util.*;
-
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.util.ReflectionUtils;
-
-/** Implements partial value reduction during mapping. This can minimize the
- * size of intermediate data. Buffers a list of values for each unique key,
- * then invokes the combiner's reduce method to merge some values before
- * they're transferred to a reduce node. */
-class CombiningCollector implements OutputCollector {
- private int limit;
-
- private int count = 0;
- private Map keyToValues; // the buffer
-
- private JobConf job;
- private OutputCollector out;
- private Reducer combiner;
- private Reporter reporter;
-
- public CombiningCollector(JobConf job, OutputCollector out,
- Reporter reporter) {
- this.job = job;
- this.out = out;
- this.reporter = reporter;
- this.combiner = (Reducer)ReflectionUtils.newInstance(job.getCombinerClass(),
- job);
- this.keyToValues = new TreeMap(job.getOutputKeyComparator());
- this.limit = job.getInt("mapred.combine.buffer.size", 100000);
- }
-
- public synchronized void collect(WritableComparable key, Writable value)
- throws IOException {
-
- // buffer new value in map
- ArrayList values = (ArrayList)keyToValues.get(key);
- Writable valueClone = WritableUtils.clone(value, job);
- if (values == null) {
- // this is a new key, so create a new list
- values = new ArrayList(1);
- values.add(valueClone);
- Writable keyClone = WritableUtils.clone(key, job);
- keyToValues.put(keyClone, values);
- } else {
- // other values for this key, so just add.
- values.add(valueClone);
- }
-
- count++;
-
- if (count >= this.limit) { // time to flush
- flush();
- }
- }
-
- public synchronized void flush() throws IOException {
- Iterator pairs = keyToValues.entrySet().iterator();
- while (pairs.hasNext()) {
- Map.Entry pair = (Map.Entry)pairs.next();
- combiner.reduce((WritableComparable)pair.getKey(),
- ((ArrayList)pair.getValue()).iterator(),
- out, reporter);
- }
- keyToValues.clear();
- count = 0;
- }
-
- public synchronized void close() throws IOException {
- combiner.close();
- }
-
-}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Dec 7 17:53:07 2006
@@ -115,7 +115,7 @@
String reduceId = "reduce_" + newId();
for (int i = 0; i < mapIds.size(); i++) {
String mapId = (String)mapIds.get(i);
- Path mapOut = this.mapoutputFile.getOutputFile(mapId, 0);
+ Path mapOut = this.mapoutputFile.getOutputFile(mapId);
Path reduceIn = this.mapoutputFile.getInputFile(i, reduceId);
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create " +
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java Thu Dec 7 17:53:07 2006
@@ -32,11 +32,36 @@
/** Create a local map output file name.
* @param mapTaskId a map task id
- * @param partition a reduce partition
*/
- public Path getOutputFile(String mapTaskId, int partition)
+ public Path getOutputFile(String mapTaskId)
throws IOException {
- return conf.getLocalPath(mapTaskId+"/part-"+partition+".out");
+ return conf.getLocalPath(mapTaskId+"/file.out");
+ }
+
+ /** Create a local map output index file name.
+ * @param mapTaskId a map task id
+ */
+ public Path getOutputIndexFile(String mapTaskId)
+ throws IOException {
+ return conf.getLocalPath(mapTaskId+"/file.out.index");
+ }
+
+ /** Create a local map spill file name.
+ * @param mapTaskId a map task id
+ * @param spillNumber the number
+ */
+ public Path getSpillFile(String mapTaskId, int spillNumber)
+ throws IOException {
+ return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out");
+ }
+
+ /** Create a local map spill index file name.
+ * @param mapTaskId a map task id
+ * @param spillNumber the number
+ */
+ public Path getSpillIndexFile(String mapTaskId, int spillNumber)
+ throws IOException {
+ return conf.getLocalPath(mapTaskId+"/spill" +spillNumber+".out.index");
}
/** Create a local reduce input file name.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Thu Dec 7 17:53:07 2006
@@ -19,11 +19,14 @@
package org.apache.hadoop.mapred;
import java.io.*;
+import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.SequenceFile.Sorter;
+import org.apache.hadoop.io.SequenceFile.Sorter.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +35,9 @@
import org.apache.commons.logging.*;
import org.apache.hadoop.metrics.Metrics;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+
/** A Map task. */
class MapTask extends Task {
@@ -127,14 +133,117 @@
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
- // open output files
- final int partitions = job.getNumReduceTasks();
- final SequenceFile.Writer[] outs = new SequenceFile.Writer[partitions];
+ Reporter reporter = getReporter(umbilical, getProgress());
+
+ MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
+
+ final RecordReader rawIn = // open input
+ job.getInputFormat().getRecordReader
+ (FileSystem.get(job), split, job, reporter);
+
+ RecordReader in = new RecordReader() { // wrap in progress reporter
+ private float perByte = 1.0f /(float)split.getLength();
+
+ public WritableComparable createKey() {
+ return rawIn.createKey();
+ }
+
+ public Writable createValue() {
+ return rawIn.createValue();
+ }
+
+ public synchronized boolean next(Writable key, Writable value)
+ throws IOException {
+
+ float progress = // compute progress
+ (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
+ reportProgress(umbilical, progress);
+ long beforePos = getPos();
+ boolean ret = rawIn.next(key, value);
+ myMetrics.mapInput(getPos() - beforePos);
+ return ret;
+ }
+ public long getPos() throws IOException { return rawIn.getPos(); }
+ public void close() throws IOException { rawIn.close(); }
+ };
+
+ MapRunnable runner =
+ (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+
try {
- Reporter reporter = getReporter(umbilical, getProgress());
- FileSystem localFs = FileSystem.getNamed("local", job);
- CompressionCodec codec = null;
- CompressionType compressionType = CompressionType.NONE;
+ runner.run(in, collector, reporter); // run the map
+ } finally {
+ in.close(); // close input
+ //check whether the length of the key/value buffer is 0. If not, then
+ //we need to spill that to disk. Note that we reset the key/val buffer
+ //upon each spill (so a length > 0 means that we have not spilled yet)
+ if (((MapOutputBuffer)collector).keyValBuffer.getLength() > 0) {
+ ((MapOutputBuffer)collector).sortAndSpillToDisk();
+ }
+ //merge the partitions from the spilled files and create one output
+ collector.mergeParts();
+ //close
+ collector.close();
+ }
+ done(umbilical);
+ }
+
+ public void setConf(Configuration conf) {
+ if (conf instanceof JobConf) {
+ this.conf = (JobConf) conf;
+ } else {
+ this.conf = new JobConf(conf);
+ }
+ this.mapOutputFile.setConf(this.conf);
+ }
+
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ class MapOutputBuffer implements OutputCollector {
+
+ private final int partitions;
+ private Partitioner partitioner;
+ private TaskUmbilicalProtocol umbilical;
+ private JobConf job;
+ private Reporter reporter;
+
+ private DataOutputBuffer keyValBuffer; //the buffer where key/val will
+ //be stored before they are
+ //spilled to disk
+ private int maxBufferSize; //the max amount of in-memory space after which
+ //we will spill the keyValBuffer to disk
+ private int numSpills; //maintains the no. of spills to disk done so far
+
+ private FileSystem localFs;
+ private CompressionCodec codec;
+ private CompressionType compressionType;
+ private Class keyClass;
+ private Class valClass;
+ private WritableComparator comparator;
+ private BufferSorter []sortImpl;
+ private SequenceFile.Writer writer;
+ private FSDataOutputStream out;
+ private FSDataOutputStream indexOut;
+ private long segmentStart;
+ public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
+ Reporter reporter) throws IOException {
+ this.partitions = job.getNumReduceTasks();
+ this.partitioner = (Partitioner)ReflectionUtils.newInstance(
+ job.getPartitionerClass(), job);
+ maxBufferSize = job.getInt("io.sort.mb", 100) * 1024 * 1024;
+ keyValBuffer = new DataOutputBuffer();
+
+ this.umbilical = umbilical;
+ this.job = job;
+ this.reporter = reporter;
+ this.comparator = job.getOutputKeyComparator();
+ this.keyClass = job.getMapOutputKeyClass();
+ this.valClass = job.getMapOutputValueClass();
+ this.localFs = FileSystem.getNamed("local", job);
+ this.codec = null;
+ this.compressionType = CompressionType.NONE;
if (job.getCompressMapOutput()) {
// find the kind of compression to do, defaulting to record
compressionType = job.getMapOutputCompressionType();
@@ -145,106 +254,249 @@
codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, job);
}
- for (int i = 0; i < partitions; i++) {
- Path filename = mapOutputFile.getOutputFile(getTaskId(), i);
- outs[i] =
- SequenceFile.createWriter(localFs, job, filename,
- job.getMapOutputKeyClass(),
- job.getMapOutputValueClass(),
- compressionType, codec, reporter);
- LOG.info("opened "+this.mapOutputFile.getOutputFile(getTaskId(), i).getName());
- }
-
- final Partitioner partitioner =
- (Partitioner)ReflectionUtils.newInstance(job.getPartitionerClass(), job);
-
- OutputCollector partCollector = new OutputCollector() { // make collector
- public synchronized void collect(WritableComparable key,
- Writable value)
- throws IOException {
- SequenceFile.Writer out = outs[partitioner.getPartition(key, value, partitions)];
- long beforePos = out.getLength();
- out.append(key, value);
- reportProgress(umbilical);
- myMetrics.mapOutput(out.getLength() - beforePos);
- }
- };
-
- OutputCollector collector = partCollector;
-
- boolean combining = job.getCombinerClass() != null;
- if (combining) { // add combining collector
- collector = new CombiningCollector(job, partCollector, reporter);
+ sortImpl = new BufferSorter[partitions];
+ for (int i = 0; i < partitions; i++)
+ sortImpl[i] = (BufferSorter)ReflectionUtils.newInstance(
+ job.getClass("map.sort.class", MergeSorter.class,
+ BufferSorter.class), job);
+ }
+ public void startPartition(int partNumber) throws IOException {
+ //We create the sort output as multiple sequence files within a spilled
+ //file. So we create a writer for each partition.
+ segmentStart = out.getPos();
+ writer =
+ SequenceFile.createWriter(job, out, job.getMapOutputKeyClass(),
+ job.getMapOutputValueClass(), compressionType, codec);
+ }
+ private void endPartition(int partNumber) throws IOException {
+ //Need to write syncs especially if block compression is in use
+ //We also update the index file to contain the part offsets per
+ //spilled file
+ writer.sync();
+ indexOut.writeLong(segmentStart);
+ //we also store 0 length key/val segments to make the merge phase easier.
+ indexOut.writeLong(out.getPos()-segmentStart);
+ }
+
+ public void collect(WritableComparable key,
+ Writable value) throws IOException {
+ synchronized (this) {
+ //dump the key/value to buffer
+ int keyOffset = keyValBuffer.getLength();
+ key.write(keyValBuffer);
+ int keyLength = keyValBuffer.getLength() - keyOffset;
+ value.write(keyValBuffer);
+ int valLength = keyValBuffer.getLength() - (keyOffset + keyLength);
+
+ int partNumber = partitioner.getPartition(key, value, partitions);
+ sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
+
+ reportProgress(umbilical);
+ myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
+
+ //now check whether we need to spill to disk
+ long totalMem = 0;
+ for (int i = 0; i < partitions; i++)
+ totalMem += sortImpl[i].getMemoryUtilized();
+ if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) {
+ sortAndSpillToDisk();
+ keyValBuffer.reset();
+ for (int i = 0; i < partitions; i++)
+ sortImpl[i].close();
+ }
}
-
- final RecordReader rawIn = // open input
- job.getInputFormat().getRecordReader
- (FileSystem.get(job), split, job, reporter);
-
- RecordReader in = new RecordReader() { // wrap in progress reporter
- private float perByte = 1.0f /(float)split.getLength();
-
- public WritableComparable createKey() {
- return rawIn.createKey();
- }
+ }
+
+ //sort, combine and spill to disk
+ private void sortAndSpillToDisk() throws IOException {
+ synchronized (this) {
+ Path filename = mapOutputFile.getSpillFile(getTaskId(), numSpills);
+ //we just create the FSDataOutputStream object here.
+ out = localFs.create(filename);
+ Path indexFilename = mapOutputFile.getSpillIndexFile(getTaskId(),
+ numSpills);
+ indexOut = localFs.create(indexFilename);
+ LOG.info("opened "+
+ mapOutputFile.getSpillFile(getTaskId(), numSpills).getName());
- public Writable createValue() {
- return rawIn.createValue();
- }
+ //invoke the sort
+ for (int i = 0; i < partitions; i++) {
+ sortImpl[i].setInputBuffer(keyValBuffer);
+ RawKeyValueIterator rIter = sortImpl[i].sort();
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
-
- float progress = // compute progress
- (float)Math.min((rawIn.getPos()-split.getStart())*perByte, 1.0f);
- reportProgress(umbilical, progress);
-
- long beforePos = getPos();
- boolean ret = rawIn.next(key, value);
- myMetrics.mapInput(getPos() - beforePos);
- return ret;
+ startPartition(i);
+ if (rIter != null) {
+ //invoke the combiner if one is defined
+ if (job.getCombinerClass() != null) {
+ //we instantiate and close the combiner for each partition. This
+ //is required for streaming where the combiner runs as a separate
+ //process and we want to make sure that the combiner process has
+ //got all the input key/val, processed, and output the result
+ //key/vals before we write the partition header in the output file
+ Reducer combiner = (Reducer)ReflectionUtils.newInstance(
+ job.getCombinerClass(), job);
+ // make collector
+ OutputCollector combineCollector = new OutputCollector() {
+ public void collect(WritableComparable key, Writable value)
+ throws IOException {
+ synchronized (this) {
+ writer.append(key, value);
+ }
+ }
+ };
+ combineAndSpill(rIter, combiner, combineCollector);
+ combiner.close();
+ }
+ else //just spill the sorted data
+ spill(rIter);
}
- public long getPos() throws IOException { return rawIn.getPos(); }
- public void close() throws IOException { rawIn.close(); }
- };
-
- MapRunnable runner =
- (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+ endPartition(i);
+ }
+ numSpills++;
+ out.close();
+ indexOut.close();
+ }
+ }
+
+ private void combineAndSpill(RawKeyValueIterator resultIter,
+ Reducer combiner, OutputCollector combineCollector) throws IOException {
+ //combine the key/value obtained from the offset & indices arrays.
+ CombineValuesIterator values = new CombineValuesIterator(resultIter,
+ comparator, keyClass, valClass, umbilical, job);
+ while (values.more()) {
+ combiner.reduce(values.getKey(), values, combineCollector, reporter);
+ values.nextKey();
+ }
+ }
+
+ private void spill(RawKeyValueIterator resultIter) throws IOException {
+ Writable key = null;
+ Writable value = null;
try {
- runner.run(in, collector, reporter); // run the map
+ key = (WritableComparable)ReflectionUtils.newInstance(keyClass, job);
+ value = (Writable)ReflectionUtils.newInstance(valClass, job);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
- if (combining) { // flush combiner
- ((CombiningCollector)collector).flush();
- }
+ DataInputBuffer keyIn = new DataInputBuffer();
+ DataInputBuffer valIn = new DataInputBuffer();
+ DataOutputBuffer valOut = new DataOutputBuffer();
+ while (resultIter.next()) {
+ keyIn.reset(resultIter.getKey().getData(),
+ resultIter.getKey().getLength());
+ key.readFields(keyIn);
+ valOut.reset();
+ (resultIter.getValue()).writeUncompressedBytes(valOut);
+ valIn.reset(valOut.getData(), valOut.getLength());
+ value.readFields(valIn);
- } finally {
- if (combining) {
- ((CombiningCollector)collector).close();
- }
- in.close(); // close input
+ writer.append(key, value);
}
- } finally {
- for (int i = 0; i < partitions; i++) { // close output
- if (outs[i] != null) {
- outs[i].close();
+ }
+
+ public void mergeParts() throws IOException {
+ Path finalOutputFile = mapOutputFile.getOutputFile(getTaskId());
+ Path finalIndexFile = mapOutputFile.getOutputIndexFile(getTaskId());
+
+ if (numSpills == 1) { //the spill is the final output
+ Path spillPath = mapOutputFile.getSpillFile(getTaskId(), 0);
+ Path spillIndexPath = mapOutputFile.getSpillIndexFile(getTaskId(), 0);
+ localFs.rename(spillPath, finalOutputFile);
+ localFs.rename(spillIndexPath, finalIndexFile);
+ return;
+ }
+
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = localFs.create(finalOutputFile, true,
+ 4096);
+ //The final index file output stream
+ FSDataOutputStream finalIndexOut = localFs.create(finalIndexFile, true,
+ 4096);
+ long segmentStart;
+
+ if (numSpills == 0) {
+ //create dummy files
+ for (int i = 0; i < partitions; i++) {
+ segmentStart = finalOut.getPos();
+ SequenceFile.createWriter(job, finalOut,
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ compressionType, codec);
+ finalIndexOut.writeLong(segmentStart);
+ finalIndexOut.writeLong(finalOut.getPos() - segmentStart);
}
+ finalOut.close();
+ finalIndexOut.close();
+ return;
+ }
+
+ Path [] filename = new Path[numSpills];
+ Path [] indexFileName = new Path[numSpills];
+ FSDataInputStream in[] = new FSDataInputStream[numSpills];
+ FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
+
+ for(int i = 0; i < numSpills; i++) {
+ filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
+ in[i] = localFs.open(filename[i]);
+ indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
+ indexIn[i] = localFs.open(indexFileName[i]);
+ }
+
+ //create a sorter object as we need access to the SegmentDescriptor
+ //class and merge methods
+ Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
+ sorter.setFactor(numSpills);
+
+ for (int parts = 0; parts < partitions; parts++){
+ List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
+ for(int i = 0; i < numSpills; i++) {
+ long segmentOffset = indexIn[i].readLong();
+ long segmentLength = indexIn[i].readLong();
+ SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
+ segmentLength, filename[i]);
+ s.preserveInput(true);
+ s.doSync();
+ segmentList.add(i, s);
+ }
+ segmentStart = finalOut.getPos();
+ SequenceFile.Writer writer = SequenceFile.createWriter(job, finalOut,
+ job.getMapOutputKeyClass(), job.getMapOutputValueClass(),
+ compressionType, codec);
+ sorter.writeFile(sorter.merge(segmentList), writer);
+ //add a sync block - required esp. for block compression to ensure
+ //partition data don't span partition boundaries
+ writer.sync();
+ //when we write the offset/length to the final index file, we write
+ //longs for both. This helps us to reliably seek directly to the
+ //offset/length for a partition when we start serving the byte-ranges
+ //to the reduces. We probably waste some space in the file by doing
+ //this as opposed to writing VLong but it helps us later on.
+ finalIndexOut.writeLong(segmentStart);
+ finalIndexOut.writeLong(finalOut.getPos()-segmentStart);
+ }
+ finalOut.close();
+ finalIndexOut.close();
+ //cleanup
+ for(int i = 0; i < numSpills; i++) {
+ in[i].close(); localFs.delete(filename[i]);
+ indexIn[i].close(); localFs.delete(indexFileName[i]);
}
}
- done(umbilical);
- }
-
- public void setConf(Configuration conf) {
- if (conf instanceof JobConf) {
- this.conf = (JobConf) conf;
- } else {
- this.conf = new JobConf(conf);
+
+ public void close() throws IOException {
+ //empty for now
+ }
+
+ private class CombineValuesIterator extends ValuesIterator {
+
+ public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
+ WritableComparator comparator, Class keyClass,
+ Class valClass, TaskUmbilicalProtocol umbilical,
+ Configuration conf)
+ throws IOException {
+ super(in, comparator, keyClass, valClass, umbilical, conf);
+ }
}
- this.mapOutputFile.setConf(this.conf);
- }
-
- public Configuration getConf() {
- return this.conf;
}
-
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MergeSorter.java Thu Dec 7 17:53:07 2006
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Comparator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.util.MergeSort;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+
+/** This class implements the sort method from BasicTypeSorterBase class as
+ * MergeSort. Note that this class is really a wrapper over the actual
+ * mergesort implementation that is there in the util package. The main intent
+ * of providing this class is to setup the input data for the util.MergeSort
+ * algo so that the latter doesn't need to bother about the various data
+ * structures that have been created for the Map output but rather concentrate
+ * on the core algorithm (thereby allowing easy integration of a mergesort
+ * implementation). The bridge between this class and the util.MergeSort class
+ * is the Comparator.
+ * @author ddas
+ *
+ */
+class MergeSorter extends BasicTypeSorterBase
+implements Comparator<IntWritable> {
+
+ /** The sort method derived from BasicTypeSorterBase and overridden here*/
+ public RawKeyValueIterator sort() {
+ MergeSort m = new MergeSort(this);
+ int count = super.count;
+ if (count == 0) return null;
+ int [] pointers = (int[])super.pointers;
+ int [] pointersCopy = new int[count];
+ System.arraycopy(pointers, 0, pointersCopy, 0, count);
+ m.mergeSort(pointers, pointersCopy, 0, count);
+ return new MRSortResultIterator(super.keyValBuffer, pointersCopy,
+ super.startOffsets, super.keyLengths, super.valueLengths);
+ }
+ /** The implementation of the compare method from Comparator. This basically
+ * forwards the call to the super class's compare. Note that
+ * Comparator.compare takes objects as inputs and so the int values are
+ * wrapped in (reusable) IntWritables from the class util.MergeSort
+ * @param i
+ * @param j
+ * @return int as per the specification of Comparator.compare
+ */
+ public int compare (IntWritable i, IntWritable j) {
+ return super.compare(i.get(), j.get());
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Dec 7 17:53:07 2006
@@ -121,7 +121,7 @@
}
/** Iterates values while keys match in sorted input. */
- private class ValuesIterator implements Iterator {
+ static class ValuesIterator implements Iterator {
private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
private WritableComparable key; // current key
private Writable value; // current value
@@ -181,9 +181,6 @@
public WritableComparable getKey() { return key; }
private void getNext() throws IOException {
- reducePhase.set(in.getProgress().get()); // update progress
- reportProgress(umbilical);
-
Writable lastKey = key; // save previous key
try {
key = (WritableComparable)ReflectionUtils.newInstance(keyClass, this.conf);
@@ -211,13 +208,25 @@
}
}
}
+ private class ReduceValuesIterator extends ValuesIterator {
+ public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
+ WritableComparator comparator, Class keyClass,
+ Class valClass, TaskUmbilicalProtocol umbilical,
+ Configuration conf)
+ throws IOException {
+ super(in, comparator, keyClass, valClass, umbilical, conf);
+ }
+ public void informReduceProgress() {
+ reducePhase.set(super.in.getProgress().get()); // update progress
+ reportProgress(super.umbilical);
+ }
+ }
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException {
Class valueClass = job.getMapOutputValueClass();
Reducer reducer = (Reducer)ReflectionUtils.newInstance(
job.getReducerClass(), job);
- reducer.configure(job);
FileSystem lfs = FileSystem.getNamed("local", job);
copyPhase.complete(); // copy is already complete
@@ -262,7 +271,7 @@
// sort the input file
SequenceFile.Sorter sorter =
new SequenceFile.Sorter(lfs, comparator, valueClass, job);
- rIter = sorter.sortAndIterate(mapFiles, tempDir,
+ rIter = sorter.merge(mapFiles, tempDir,
!conf.getKeepFailedTaskFiles()); // sort
} finally {
@@ -300,12 +309,14 @@
try {
Class keyClass = job.getMapOutputKeyClass();
Class valClass = job.getMapOutputValueClass();
- ValuesIterator values = new ValuesIterator(rIter, comparator, keyClass,
- valClass, umbilical, job);
+ ReduceValuesIterator values = new ReduceValuesIterator(rIter, comparator,
+ keyClass, valClass, umbilical, job);
+ values.informReduceProgress();
while (values.more()) {
myMetrics.reduceInput();
reducer.reduce(values.getKey(), values, collector, reporter);
values.nextKey();
+ values.informReduceProgress();
}
} finally {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec 7 17:53:07 2006
@@ -21,6 +21,7 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.io.*;
import org.apache.hadoop.metrics.Metrics;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -1525,6 +1526,7 @@
* @author Owen O'Malley
*/
public static class MapOutputServlet extends HttpServlet {
+ private final int MAX_BYTES_TO_READ = 64 * 1024;
public void doGet(HttpServletRequest request,
HttpServletResponse response
) throws ServletException, IOException {
@@ -1535,20 +1537,37 @@
}
ServletContext context = getServletContext();
int reduce = Integer.parseInt(reduceId);
- byte[] buffer = new byte[64*1024];
+ byte[] buffer = new byte[MAX_BYTES_TO_READ];
OutputStream outStream = response.getOutputStream();
JobConf conf = (JobConf) context.getAttribute("conf");
FileSystem fileSys =
(FileSystem) context.getAttribute("local.file.system");
- Path filename = conf.getLocalPath(mapId+"/part-"+reduce+".out");
- response.setContentLength((int) fileSys.getLength(filename));
- InputStream inStream = null;
+ //open index file
+ Path indexFileName = conf.getLocalPath(mapId+"/file.out.index");
+ FSDataInputStream in = fileSys.open(indexFileName);
+ //seek to the correct offset for the given reduce
+ in.seek(reduce * 16);
+
+ //read the offset and length of the partition data
+ long startOffset = in.readLong();
+ long partLength = in.readLong();
+
+ in.close();
+
+ Path mapOutputFileName = conf.getLocalPath(mapId+"/file.out");
+
+ response.setContentLength((int) partLength);
+ FSDataInputStream inStream = null;
// true iff IOException was caused by attempt to access input
boolean isInputException = true;
try {
- inStream = fileSys.open(filename);
+ inStream = fileSys.open(mapOutputFileName);
+ inStream.seek(startOffset);
try {
- int len = inStream.read(buffer);
+ int totalRead = 0;
+ int len = inStream.read(buffer, 0,
+ partLength < MAX_BYTES_TO_READ
+ ? (int)partLength : MAX_BYTES_TO_READ);
while (len > 0) {
try {
outStream.write(buffer, 0, len);
@@ -1556,7 +1575,11 @@
isInputException = false;
throw ie;
}
- len = inStream.read(buffer);
+ totalRead += len;
+ if (totalRead == partLength) break;
+ len = inStream.read(buffer, 0,
+ (partLength - totalRead) < MAX_BYTES_TO_READ
+ ? (int)(partLength - totalRead) : MAX_BYTES_TO_READ);
}
} finally {
inStream.close();
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java?view=auto&rev=483772
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/MergeSort.java Thu Dec 7 17:53:07 2006
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.Comparator;
+import org.apache.hadoop.io.IntWritable;
+
+/** An implementation of the core algorithm of MergeSort. */
+public class MergeSort {
+ //Reusable IntWritables
+ IntWritable I = new IntWritable(0);
+ IntWritable J = new IntWritable(0);
+
+ private Comparator comparator; //the comparator that the algo should use
+
+ public MergeSort(Comparator comparator) {
+ this.comparator = comparator;
+ }
+
+ public void mergeSort(int src[], int dest[], int low, int high) {
+ int length = high - low;
+
+ // Insertion sort on smallest arrays
+ if (length < 7) {
+ for (int i=low; i<high; i++) {
+ for (int j=i;j > low ; j--) {
+ I.set(dest[j-1]);
+ J.set(dest[j]);
+ if (comparator.compare(I, J)>0)
+ swap(dest, j, j-1);
+ }
+ }
+ return;
+ }
+
+ // Recursively sort halves of dest into src
+ int mid = (low + high) >> 1;
+ mergeSort(dest, src, low, mid);
+ mergeSort(dest, src, mid, high);
+
+ I.set(src[mid-1]);
+ J.set(src[mid]);
+ // If list is already sorted, just copy from src to dest. This is an
+ // optimization that results in faster sorts for nearly ordered lists.
+ if (comparator.compare(I, J) <= 0) {
+ System.arraycopy(src, low, dest, low, length);
+ return;
+ }
+
+ // Merge sorted halves (now in src) into dest
+ for (int i = low, p = low, q = mid; i < high; i++) {
+ if (q < high && p < mid) {
+ I.set(src[p]);
+ J.set(src[q]);
+ }
+ if (q>=high || p<mid && comparator.compare(I, J) <= 0)
+ dest[i] = src[p++];
+ else
+ dest[i] = src[q++];
+ }
+ }
+
+ private void swap(int x[], int a, int b) {
+ int t = x[a];
+ x[a] = x[b];
+ x[b] = t;
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?view=diff&rev=483772&r1=483771&r2=483772
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Thu Dec 7 17:53:07 2006
@@ -233,16 +233,6 @@
}
public void close() throws IOException {
- MapOutputFile namer = new MapOutputFile();
- namer.setConf(conf);
- FileSystem fs = FileSystem.get(conf);
- Path output = namer.getOutputFile(taskId, 0);
- assertTrue("map output exists " + output, fs.exists(output));
- SequenceFile.Reader rdr =
- new SequenceFile.Reader(fs, output, conf);
- assertEquals("is map output compressed " + output, compress,
- rdr.isCompressed());
- rdr.close();
}
}
@@ -264,7 +254,7 @@
) throws IOException {
if (first) {
first = false;
- Path input = conf.getLocalPath(taskId+"/all.2");
+ Path input = conf.getLocalPath(taskId+"/map_0.out");
FileSystem fs = FileSystem.get(conf);
assertTrue("reduce input exists " + input, fs.exists(input));
SequenceFile.Reader rdr =