You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/13 05:15:33 UTC

svn commit: r636623 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/lib/aggregate/ src/test/org/apache/hadoop/mapred/

Author: ddas
Date: Wed Mar 12 21:15:29 2008
New Revision: 636623

URL: http://svn.apache.org/viewvc?rev=636623&view=rev
Log:
HADOOP-2399. Input key and value to combiner and reducer is reused. Contributed by Owen O'Malley.

Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Mar 12 21:15:29 2008
@@ -91,6 +91,9 @@
     HADOOP-2758. Reduce buffer copies in DataNode when data is read from
     HDFS, without negatively affecting read throughput. (rangadi)
 
+    HADOOP-2399. Input key and value to combiner and reducer is reused.
+    (Owen O'Malley via ddas). 
+
   BUG FIXES
 
     HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Wed Mar 12 21:15:29 2008
@@ -681,16 +681,19 @@
       //empty for now
     }
     
-    private class CombineValuesIterator extends ValuesIterator {
+    private class CombineValuesIterator<KEY,VALUE> 
+            extends ValuesIterator<KEY,VALUE> {
         
       public CombineValuesIterator(SequenceFile.Sorter.RawKeyValueIterator in, 
-                                   RawComparator comparator, Class keyClass,
-                                   Class valClass, Configuration conf, Reporter reporter) 
+                                   RawComparator<KEY> comparator, 
+                                   Class<KEY> keyClass,
+                                   Class<VALUE> valClass, Configuration conf, 
+                                   Reporter reporter) 
         throws IOException {
         super(in, comparator, keyClass, valClass, conf, reporter);
       }
       
-      public Object next() {
+      public VALUE next() {
         combineInputCounter.increment(1);
         return super.next();
       }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Wed Mar 12 21:15:29 2008
@@ -34,6 +34,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
@@ -51,7 +52,6 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.ChecksumFileSystem;
-import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.InputBuffer;
 import org.apache.hadoop.io.IntWritable;
@@ -67,6 +67,7 @@
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -184,50 +185,60 @@
   }
 
   /** Iterates values while keys match in sorted input. */
-  static class ValuesIterator implements Iterator {
+  static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
     private SequenceFile.Sorter.RawKeyValueIterator in; //input iterator
-    private Object key;               // current key
-    private Object value;                       // current value
+    private KEY key;               // current key
+    private KEY nextKey;
+    private VALUE value;             // current value
     private boolean hasNext;                      // more w/ this key
     private boolean more;                         // more in file
-    private RawComparator comparator;
-    private DataOutputBuffer valOut = new DataOutputBuffer();
+    private RawComparator<KEY> comparator;
+    private DataOutputBuffer nextValue = new DataOutputBuffer();
     private InputBuffer valIn = new InputBuffer();
     private InputBuffer keyIn = new InputBuffer();
-    protected Reporter reporter;
-    private Deserializer keyDeserializer;
-    private Deserializer valDeserializer;
+    protected Progressable reporter;
+    private Deserializer<KEY> keyDeserializer;
+    private Deserializer<VALUE> valDeserializer;
 
     @SuppressWarnings("unchecked")
     public ValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in, 
-                           RawComparator comparator, Class keyClass,
-                           Class valClass, Configuration conf, 
-                           Reporter reporter)
+                           RawComparator<KEY> comparator, 
+                           Class<KEY> keyClass,
+                           Class<VALUE> valClass, Configuration conf, 
+                           Progressable reporter)
       throws IOException {
       this.in = in;
       this.comparator = comparator;
       this.reporter = reporter;
+      nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
+      value = (VALUE) ReflectionUtils.newInstance(valClass, conf);
       SerializationFactory serializationFactory = new SerializationFactory(conf);
       this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
       this.keyDeserializer.open(keyIn);
       this.valDeserializer = serializationFactory.getDeserializer(valClass);
       this.valDeserializer.open(valIn);
-      getNext();
+      readNextKey();
+      key = nextKey;
+      nextKey = (KEY) ReflectionUtils.newInstance(keyClass, conf);
+      hasNext = more;
     }
 
     /// Iterator methods
 
     public boolean hasNext() { return hasNext; }
 
-    public Object next() {
-      Object result = value;                      // save value
+    public VALUE next() {
+      if (!hasNext) {
+        throw new NoSuchElementException("iterate past last value");
+      }
       try {
-        getNext();                                  // move to next
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+        readNextValue();
+        readNextKey();
+      } catch (IOException ie) {
+        throw new RuntimeException("problem advancing", ie);
       }
       reporter.progress();
-      return result;                              // return saved value
+      return value;
     }
 
     public void remove() { throw new RuntimeException("not implemented"); }
@@ -235,45 +246,62 @@
     /// Auxiliary methods
 
     /** Start processing next unique key. */
-    public void nextKey() {
-      while (hasNext) { next(); }                 // skip any unread
+    public void nextKey() throws IOException {
+      // read until we find a new key
+      while (hasNext) { 
+        readNextKey();
+      }
+      // move the next key to the current one
+      KEY tmpKey = key;
+      key = nextKey;
+      nextKey = tmpKey;
       hasNext = more;
     }
 
     /** True iff more keys remain. */
-    public boolean more() { return more; }
+    public boolean more() { 
+      return more; 
+    }
 
     /** The current key. */
-    public Object getKey() { return key; }
+    public Object getKey() { 
+      return key; 
+    }
 
-    @SuppressWarnings("unchecked")
-    private void getNext() throws IOException {
-      Object lastKey = key;                     // save previous key
+    /** 
+     * read the next key 
+     */
+    private void readNextKey() throws IOException {
       more = in.next();
       if (more) {
-        //de-serialize the raw key/value
-        keyIn.reset(in.getKey().getData(), in.getKey().getLength());
-        key = keyDeserializer.deserialize(null); // force new object
-        valOut.reset();
-        (in.getValue()).writeUncompressedBytes(valOut);
-        valIn.reset(valOut.getData(), valOut.getLength());
-        value = valDeserializer.deserialize(null); // force new object
-
-        if (lastKey == null) {
-          hasNext = true;
-        } else {
-          hasNext = (comparator.compare(key, lastKey) == 0);
-        }
+        DataOutputBuffer nextKeyBytes = in.getKey();
+        keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getLength());
+        keyDeserializer.deserialize(nextKey);
+        hasNext = key != null && (comparator.compare(key, nextKey) == 0);
       } else {
         hasNext = false;
       }
     }
+
+    /**
+     * Read the next value
+     * @throws IOException
+     */
+    private void readNextValue() throws IOException {
+      nextValue.reset();
+      in.getValue().writeUncompressedBytes(nextValue);
+      valIn.reset(nextValue.getData(), nextValue.getLength());
+      valDeserializer.deserialize(value);
+    }
   }
-  private class ReduceValuesIterator extends ValuesIterator {
+
+  private class ReduceValuesIterator<KEY,VALUE> 
+          extends ValuesIterator<KEY,VALUE> {
     public ReduceValuesIterator (SequenceFile.Sorter.RawKeyValueIterator in,
-                                 RawComparator comparator, Class keyClass,
-                                 Class valClass,
-                                 Configuration conf, Reporter reporter)
+                                 RawComparator<KEY> comparator, 
+                                 Class<KEY> keyClass,
+                                 Class<VALUE> valClass,
+                                 Configuration conf, Progressable reporter)
       throws IOException {
       super(in, comparator, keyClass, valClass, conf, reporter);
     }
@@ -281,7 +309,7 @@
       reducePhase.set(super.in.getProgress().get()); // update progress
       reporter.progress();
     }
-    public Object next() {
+    public VALUE next() {
       reduceInputValueCounter.increment(1);
       return super.next();
     }
@@ -809,7 +837,7 @@
             return CopyResult.OBSOLETE;
           }
           
-          bytes = fs.getLength(tmpFilename);
+          bytes = fs.getFileStatus(tmpFilename).getLen();
           //resolve the final filename against the directory where the tmpFile
           //got created
           filename = new Path(tmpFilename.getParent(), filename.getName());
@@ -1065,7 +1093,7 @@
             // all reduce-tasks swamping the same tasktracker
             Collections.shuffle(knownOutputs, this.random);
             
-            Iterator locIt = knownOutputs.iterator();
+            Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
             
             currentTime = System.currentTimeMillis();
             while (locIt.hasNext()) {
@@ -1255,7 +1283,7 @@
                 // the failure is due to a lost tasktracker (causes many
                 // unnecessary backoffs). If not, we only take a small hit
                 // polling the tasktracker a few more times
-                Iterator locIt = knownOutputs.iterator();
+                Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
                 while (locIt.hasNext()) {
                   MapOutputLocation loc = (MapOutputLocation)locIt.next();
                   if (cr.getHost().equals(loc.getHost())) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate/UniqValueCount.java Wed Mar 12 21:15:29 2008
@@ -80,7 +80,7 @@
    */
   public void addNextValue(Object val) {
     if (this.numItems <= this.maxNumItems) {
-      uniqItems.put(val, "1");
+      uniqItems.put(val.toString(), "1");
       this.numItems = this.uniqItems.size();
     }
   }
@@ -122,4 +122,4 @@
     }
     return retv;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java?rev=636623&r1=636622&r2=636623&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MRCaching.java Wed Mar 12 21:15:29 2008
@@ -218,8 +218,8 @@
     // match the real string. check if there are 3 instances or not.
     Path result = new Path(TEST_ROOT_DIR + "/test.txt");
     {
-      BufferedReader file = new BufferedReader(new InputStreamReader(
-                                                                     FileSystem.getLocal(conf).open(result)));
+      BufferedReader file = new BufferedReader
+         (new InputStreamReader(FileSystem.getLocal(conf).open(result)));
       String line = file.readLine();
       while (line != null) {
         if (!testStr.equals(line))

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java?rev=636623&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceTask.java Wed Mar 12 21:15:29 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * This test exercises the ValueIterator.
+ */
+public class TestReduceTask extends TestCase {
+
+  static class NullProgress implements Progressable {
+    public void progress() { }
+  }
+
+  private static class Pair {
+    String key;
+    String value;
+    Pair(String k, String v) {
+      key = k;
+      value = v;
+    }
+  }
+  private static Pair[][] testCases =
+    new Pair[][]{
+      new Pair[]{
+                 new Pair("k1", "v1"),
+                 new Pair("k2", "v2"),
+                 new Pair("k3", "v3"),
+                 new Pair("k3", "v4"),
+                 new Pair("k4", "v5"),
+                 new Pair("k5", "v6"),
+      },
+      new Pair[]{
+                 new Pair("", "v1"),
+                 new Pair("k1", "v2"),
+                 new Pair("k2", "v3"),
+                 new Pair("k2", "v4"),
+      },
+      new Pair[] {},
+      new Pair[]{
+                 new Pair("k1", "v1"),
+                 new Pair("k1", "v2"),
+                 new Pair("k1", "v3"),
+                 new Pair("k1", "v4"),
+      }
+    };
+  
+  public void runValueIterator(Path tmpDir, Pair[] vals, 
+                               Configuration conf) throws IOException {
+    FileSystem fs = tmpDir.getFileSystem(conf);
+    Path path = new Path(tmpDir, "data.in");
+    SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path,
+                                                         Text.class, 
+                                                         Text.class);
+    for(Pair p: vals) {
+      writer.append(new Text(p.key), new Text(p.value));
+    }
+    writer.close();
+    SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, 
+                                                         Text.class, conf);
+    SequenceFile.Sorter.RawKeyValueIterator rawItr = 
+      sorter.merge(new Path[]{path}, false, tmpDir);
+    ReduceTask.ValuesIterator valItr = 
+      new ReduceTask.ValuesIterator(rawItr, WritableComparator.get(Text.class), 
+                                    Text.class, Text.class,
+                                    conf, new NullProgress());
+    int i = 0;
+    while (valItr.more()) {
+      Object key = valItr.getKey();
+      String keyString = key.toString();
+      // make sure it matches!
+      assertEquals(vals[i].key, keyString);
+      // must have at least 1 value!
+      assertTrue(valItr.hasNext());
+      while (valItr.hasNext()) {
+        String valueString = valItr.next().toString();
+        // make sure the values match
+        assertEquals(vals[i].value, valueString);
+        // make sure the keys match
+        assertEquals(vals[i].key, valItr.getKey().toString());
+        i += 1;
+      }
+      // make sure the key hasn't changed under the hood
+      assertEquals(keyString, valItr.getKey().toString());
+      valItr.nextKey();
+    }
+    assertEquals(vals.length, i);
+  }
+
+  public void testValueIterator() throws Exception {
+    Path tmpDir = new Path("build/test/test.reduce.task");
+    Configuration conf = new Configuration();
+    for (Pair[] testCase: testCases) {
+      runValueIterator(tmpDir, testCase, conf);
+    }
+  }
+}