You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/13 05:15:33 UTC
svn commit: r636623 - in /hadoop/core/trunk: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapred/lib/aggregate/
src/test/org/apache/hadoop/mapred/
Author: ddas
Date: Wed Mar 12 21:15:29 2008
New Revision: 636623
URL: http://svn.apache.org/viewvc?rev=636623&view=rev
Log:
HADOOP-2399. Input key and value to combiner and reducer is reused. Contributed by Owen O'Malley.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 12 21:15:29 2008
@@ -91,6 +91,9 @@
HADOOP-2758. Reduce buffer copies in DataNode when data is read from
HDFS, without negatively affecting read throughput. (rangadi)
+ HADOOP-2399. Input key and value to combiner and reducer is reused.
+ (Owen O'Malley via ddas).
+
BUG FIXES
HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Mar 12 21:15:29 2008
@@ -681,16 +681,19 @@
//empty for now
}
- private class CombineValuesIterator extends ValuesIterator {
+ private class CombineValuesIterator<KEY,VALUE>
+ extends ValuesIterator<KEY,VALUE> {
public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in,
- RawComparator comparator, Class keyClass,
- Class valClass, Configuration conf, Reporter reporter)
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ Reporter reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
- public Object next() {
+ public VALUE next() {
combineInputCounter.increment(1);
return super.next();
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Mar 12 21:15:29 2008
@@ -34,6 +34,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
@@ -51,7 +52,6 @@
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.InputBuffer;
import org.apache.hadoop.io.IntWritable;
@@ -67,6 +67,7 @@
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -184,50 +185,60 @@
}
/** Iterates values while keys match in sorted input. */
- static class ValuesIterator implements Iterator {
+ static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
- private Object key; // current key
- private Object value; // current value
+ private KEY key; // current key
+ private KEY nextKey;
+ private VALUE value; // current value
private boolean hasNext; // more w/ this key
private boolean more; // more in file
- private RawComparator comparator;
- private DataOutputBuffer valOut = new DataOutputBuffer();
+ private RawComparator<KEY> comparator;
+ private DataOutputBuffer nextValue = new DataOutputBuffer();
private InputBuffer valIn = new InputBuffer();
private InputBuffer keyIn = new InputBuffer();
- protected Reporter reporter;
- private Deserializer keyDeserializer;
- private Deserializer valDeserializer;
+ protected Progressable reporter;
+ private Deserializer<KEY> keyDeserializer;
+ private Deserializer<VALUE> valDeserializer;
@SuppressWarnings("unchecked")
public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
- RawComparator comparator, Class keyClass,
- Class valClass, Configuration conf,
- Reporter reporter)
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass, Configuration conf,
+ Progressable reporter)
throws IOException {
this.in = in;
this.comparator = comparator;
this.reporter = reporter;
+ nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
+ value = (VALUE) ReflectionUtils.newInstance(valClass, conf);
SerializationFactory serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(valIn);
- getNext();
+ readNextKey();
+ key = nextKey;
+ nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
+ hasNext = more;
}
/// Iterator methods
public boolean hasNext() { return hasNext; }
- public Object next() {
- Object result = value; // save value
+ public VALUE next() {
+ if (!hasNext) {
+ throw new NoSuchElementException("iterate past last value");
+ }
try {
- getNext(); // move to next
- } catch (IOException e) {
- throw new RuntimeException(e);
+ readNextValue();
+ readNextKey();
+ } catch (IOException ie) {
+ throw new RuntimeException("problem advancing", ie);
}
reporter.progress();
- return result; // return saved value
+ return value;
}
public void remove() { throw new RuntimeException("not implemented"); }
@@ -235,45 +246,62 @@
/// Auxiliary methods
/** Start processing next unique key. */
- public void nextKey() {
- while (hasNext) { next(); } // skip any unread
+ public void nextKey() throws IOException {
+ // read until we find a new key
+ while (hasNext) {
+ readNextKey();
+ }
+ // move the next key to the current one
+ KEY tmpKey = key;
+ key = nextKey;
+ nextKey = tmpKey;
hasNext = more;
}
/** True iff more keys remain. */
- public boolean more() { return more; }
+ public boolean more() {
+ return more;
+ }
/** The current key. */
- public Object getKey() { return key; }
+ public Object getKey() {
+ return key;
+ }
- @SuppressWarnings("unchecked")
- private void getNext() throws IOException {
- Object lastKey = key; // save previous key
+ /**
+ * read the next key
+ */
+ private void readNextKey() throws IOException {
more = in.next();
if (more) {
- //de-serialize the raw key/value
- keyIn.reset(in.getKey().getData(), in.getKey().getLength());
- key = keyDeserializer.deserialize(null); // force new object
- valOut.reset();
- (in.getValue()).writeUncompressedBytes(valOut);
- valIn.reset(valOut.getData(), valOut.getLength());
- value = valDeserializer.deserialize(null); // force new object
-
- if (lastKey == null) {
- hasNext = true;
- } else {
- hasNext = (comparator.compare(key, lastKey) == 0);
- }
+ DataOutputBuffer nextKeyBytes = in.getKey();
+ keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
+ keyDeserializer.deserialize(nextKey);
+ hasNext = key != null && (comparator.compare(key, nextKey) == 0);
} else {
hasNext = false;
}
}
+
+ /**
+ * Read the next value
+ * @throws IOException
+ */
+ private void readNextValue() throws IOException {
+ nextValue.reset();
+ in.getValue().writeUncompressedBytes(nextValue);
+ valIn.reset(nextValue.getData(), nextValue.getLength());
+ valDeserializer.deserialize(value);
+ }
}
- private class ReduceValuesIterator extends ValuesIterator {
+
+ private class ReduceValuesIterator<KEY,VALUE>
+ extends ValuesIterator<KEY,VALUE> {
public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
- RawComparator comparator, Class keyClass,
- Class valClass,
- Configuration conf, Reporter reporter)
+ RawComparator<KEY> comparator,
+ Class<KEY> keyClass,
+ Class<VALUE> valClass,
+ Configuration conf, Progressable reporter)
throws IOException {
super(in, comparator, keyClass, valClass, conf, reporter);
}
@@ -281,7 +309,7 @@
reducePhase.set(super.in.getProgress().get()); // update progress
reporter.progress();
}
- public Object next() {
+ public VALUE next() {
reduceInputValueCounter.increment(1);
return super.next();
}
@@ -809,7 +837,7 @@
return CopyResult.OBSOLETE;
}
- bytes = fs.getLength(tmpFilename);
+ bytes = fs.getFileStatus(tmpFilename).getLen();
//resolve the final filename against the directory where the tmpFile
//got created
filename = new Path(tmpFilename.getParent(), filename.getName());
@@ -1065,7 +1093,7 @@
// all reduce-tasks swamping the same tasktracker
Collections.shuffle(knownOutputs, this.random);
- Iterator locIt = knownOutputs.iterator();
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
currentTime = System.currentTimeMillis();
while (locIt.hasNext()) {
@@ -1255,7 +1283,7 @@
// the failure is due to a lost tasktracker (causes many
// unnecessary backoffs). If not, we only take a small hit
// polling the tasktracker a few more times
- Iterator locIt = knownOutputs.iterator();
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
while (locIt.hasNext()) {
MapOutputLocation loc = (MapOutputLocation)locIt.next();
if (cr.getHost().equals(loc.getHost())) {
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java Wed Mar 12 21:15:29 2008
@@ -80,7 +80,7 @@
*/
public void addNextValue(Object val) {
if (this.numItems <= this.maxNumItems) {
- uniqItems.put(val, "1");
+ uniqItems.put(val.toString(), "1");
this.numItems = this.uniqItems.size();
}
}
@@ -122,4 +122,4 @@
}
return retv;
}
-}
\ No newline at end of file
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Wed Mar 12 21:15:29 2008
@@ -218,8 +218,8 @@
// match the real string. check if there are 3 instances or not.
Path result = new Path(TEST_ROOT_DIR + "/test.txt");
{
- BufferedReader file = new BufferedReader(new InputStreamReader(
- FileSystem.getLocal(conf).open(result)));
+ BufferedReader file = new BufferedReader
+ (new InputStreamReader(FileSystem.getLocal(conf).open(result)));
String line = file.readLine();
while (line != null) {
if (!testStr.equals(line))
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=636623&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Wed Mar 12 21:15:29 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This test exercises the ValueIterator.
+ */
+public class TestReduceTask extends TestCase {
+
+ static class NullProgress implements Progressable {
+ public void progress() { }
+ }
+
+ private static class Pair {
+ String key;
+ String value;
+ Pair(String k, String v) {
+ key = k;
+ value = v;
+ }
+ }
+ private static Pair[][] testCases =
+ new Pair[][]{
+ new Pair[]{
+ new Pair("k1", "v1"),
+ new Pair("k2", "v2"),
+ new Pair("k3", "v3"),
+ new Pair("k3", "v4"),
+ new Pair("k4", "v5"),
+ new Pair("k5", "v6"),
+ },
+ new Pair[]{
+ new Pair("", "v1"),
+ new Pair("k1", "v2"),
+ new Pair("k2", "v3"),
+ new Pair("k2", "v4"),
+ },
+ new Pair[] {},
+ new Pair[]{
+ new Pair("k1", "v1"),
+ new Pair("k1", "v2"),
+ new Pair("k1", "v3"),
+ new Pair("k1", "v4"),
+ }
+ };
+
+ public void runValueIterator(Path tmpDir, Pair[] vals,
+ Configuration conf) throws IOException {
+ FileSystem fs = tmpDir.getFileSystem(conf);
+ Path path = new Path(tmpDir, "data.in");
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+ Text.class,
+ Text.class);
+ for(Pair p: vals) {
+ writer.append(new Text(p.key), new Text(p.value));
+ }
+ writer.close();
+ SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
+ Text.class, conf);
+ SequenceFile.Sorter.RawKeyValueIterator rawItr =
+ sorter.merge(new Path[]{path}, false, tmpDir);
+ ReduceTask.ValuesIterator valItr =
+ new ReduceTask.ValuesIterator(rawItr, WritableComparator.get(Text.class),
+ Text.class, Text.class,
+ conf, new NullProgress());
+ int i = 0;
+ while (valItr.more()) {
+ Object key = valItr.getKey();
+ String keyString = key.toString();
+ // make sure it matches!
+ assertEquals(vals[i].key, keyString);
+ // must have at least 1 value!
+ assertTrue(valItr.hasNext());
+ while (valItr.hasNext()) {
+ String valueString = valItr.next().toString();
+ // make sure the values match
+ assertEquals(vals[i].value, valueString);
+ // make sure the keys match
+ assertEquals(vals[i].key, valItr.getKey().toString());
+ i += 1;
+ }
+ // make sure the key hasn't changed under the hood
+ assertEquals(keyString, valItr.getKey().toString());
+ valItr.nextKey();
+ }
+ assertEquals(vals.length, i);
+ }
+
+ public void testValueIterator() throws Exception {
+ Path tmpDir = new Path("build/test/test.reduce.task");
+ Configuration conf = new Configuration();
+ for (Pair[] testCase: testCases) {
+ runValueIterator(tmpDir, testCase, conf);
+ }
+ }
+}