You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 15:18:44 UTC

svn commit: r693460 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/conf/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/conf/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoo...

Author: ddas
Date: Tue Sep  9 06:18:43 2008
New Revision: 693460

URL: http://svn.apache.org/viewvc?rev=693460&view=rev
Log:
HADOOP-3702. Adds ChainMapper and ChainReducer classes allow composing chains of Maps and Reduces in a single Map/Reduce job, something like MAP+ / REDUCE MAP*. Contributed by Alejandro Abdelnur.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
    hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep  9 06:18:43 2008
@@ -141,6 +141,10 @@
     HADOOP-3970. Provides a way to recover counters written to JobHistory.
     (Amar Kamat via ddas)
 
+    HADOOP-3702. Adds ChainMapper and ChainReducer classes allow composing
+    chains of Maps and Reduces in a single Map/Reduce job, something like 
+    MAP+ / REDUCE MAP*. (Alejandro Abdelnur via ddas)
+
   IMPROVEMENTS
 
     HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Tue Sep  9 06:18:43 2008
@@ -28,6 +28,7 @@
 import java.io.Reader;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -40,10 +41,10 @@
 import java.util.StringTokenizer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.Collection;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
 import javax.xml.transform.Transformer;
 import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
@@ -54,11 +55,13 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+import org.w3c.dom.DOMException;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
 
 /** 
  * Provides access to configuration parameters.
@@ -234,6 +237,19 @@
   }
 
   /**
+   * Add a configuration resource. 
+   * 
+   * The properties of this resource will override properties of previously 
+   * added resources, unless they were marked <a href="#Final">final</a>. 
+   * 
+   * @param in InputStream to deserialize the object from. 
+   */
+  public void addResource(InputStream in) {
+    addResourceObject(in);
+  }
+  
+  
+  /**
    * Reload configuration from previously added resources.
    *
    * This method will clear all the configuration read from the added 
@@ -554,6 +570,7 @@
       return false;
     }
     
+    @Override
     public String toString() {
       StringBuffer result = new StringBuffer();
       boolean first = true;
@@ -874,6 +891,23 @@
   }
 
   /**
+   * Return the number of keys in the configuration.
+   *
+   * @return number of keys in the configuration.
+   */
+  public int size() {
+    return getProps().size();
+  }
+
+  /**
+   * Clears all keys from the configuration.
+   */
+  public void clear() {
+    getProps().clear();
+    getOverlay().clear();
+  }
+
+  /**
    * Get an {@link Iterator} to go through the list of <code>String</code> 
    * key-value pairs in the configuration.
    * 
@@ -944,6 +978,12 @@
             in.close();
           }
         }
+      } else if (name instanceof InputStream) {
+        try {
+          doc = builder.parse((InputStream)name);
+        } finally {
+          ((InputStream)name).close();
+        }
       }
 
       if (doc == null) {
@@ -993,11 +1033,19 @@
         }
       }
         
-    } catch (Exception e) {
+    } catch (IOException e) {
+      LOG.fatal("error parsing conf file: " + e);
+      throw new RuntimeException(e);
+    } catch (DOMException e) {
+      LOG.fatal("error parsing conf file: " + e);
+      throw new RuntimeException(e);
+    } catch (SAXException e) {
+      LOG.fatal("error parsing conf file: " + e);
+      throw new RuntimeException(e);
+    } catch (ParserConfigurationException e) {
       LOG.fatal("error parsing conf file: " + e);
       throw new RuntimeException(e);
     }
-    
   }
 
   /** 

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writable JobConf is the Writable version of the JobConf. 
+ */
+public class WritableJobConf extends JobConf implements Writable {
+
+  /**
+   * This C.tor does not load default configuration files, 
+   * hadoop-(default|site).xml
+   */
+  public WritableJobConf() {
+    super(false); //do not load defaults
+  }
+
+  public WritableJobConf(boolean loadDefaults) {
+    super(loadDefaults);    
+  }
+
+  public WritableJobConf(Class<?> exampleClass) {
+    super(exampleClass);
+  }
+
+  public WritableJobConf(Configuration conf, Class<?> exampleClass) {
+    super(conf, exampleClass);
+  }
+
+  public WritableJobConf(Configuration conf) {
+    super(conf);
+  }
+
+  public WritableJobConf(Path config) {
+    super(config);
+  }
+
+  public WritableJobConf(String config) {
+    super(config);
+  }
+
+  @Override
+  public void readFields(final DataInput in) throws IOException {
+    if(in instanceof InputStream) {
+      this.addResource((InputStream)in);
+    } else {
+      this.addResource(new  InputStream() {
+        @Override
+        public int read() throws IOException {
+          return in.readByte();
+        }
+      });
+    }
+    
+    this.get("foo"); //so that getProps() is called, before returining back.
+  }
+
+  @Override
+  public void write(final DataOutput out) throws IOException {
+    if(out instanceof OutputStream) {
+      write((OutputStream)out);
+    }
+    else {
+      write(new OutputStream() {
+        @Override
+        public void write(int b) throws IOException {
+          out.writeByte(b);
+        }
+      });
+    }
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.GenericsUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * The Chain class provides all the common functionality for the
+ * {@link ChainMapper} and the {@link ChainReducer} classes.
+ */
+class Chain {
+  private static final String CHAIN_MAPPER = "chain.mapper";
+  private static final String CHAIN_REDUCER = "chain.reducer";
+
+  private static final String CHAIN_MAPPER_SIZE = ".size";
+  private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+  private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+  private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+  private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+
+  private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
+  private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
+
+  private static final String MAPPER_INPUT_KEY_CLASS =
+    "chain.mapper.input.key.class";
+  private static final String MAPPER_INPUT_VALUE_CLASS =
+    "chain.mapper.input.value.class";
+  private static final String MAPPER_OUTPUT_KEY_CLASS =
+    "chain.mapper.output.key.class";
+  private static final String MAPPER_OUTPUT_VALUE_CLASS =
+    "chain.mapper.output.value.class";
+  private static final String REDUCER_INPUT_KEY_CLASS =
+    "chain.reducer.input.key.class";
+  private static final String REDUCER_INPUT_VALUE_CLASS =
+    "chain.reducer.input.value.class";
+  private static final String REDUCER_OUTPUT_KEY_CLASS =
+    "chain.reducer.output.key.class";
+  private static final String REDUCER_OUTPUT_VALUE_CLASS =
+    "chain.reducer.output.value.class";
+
+  private boolean isMap;
+
+  private JobConf chainJobConf;
+
+  private List<Mapper> mappers = new ArrayList<Mapper>();
+  private Reducer reducer;
+
+  // to cache the key/value output class serializations for each chain element
+  // to avoid everytime lookup.
+  private List<Serialization> mappersKeySerialization =
+    new ArrayList<Serialization>();
+  private List<Serialization> mappersValueSerialization =
+    new ArrayList<Serialization>();
+  private Serialization reducerKeySerialization;
+  private Serialization reducerValueSerialization;
+
+  /**
+   * Creates a Chain instance configured for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
+   *              Reducer.
+   */
+  Chain(boolean isMap) {
+    this.isMap = isMap;
+  }
+
+  /**
+   * Returns the prefix to use for the configuration of the chain depending
+   * if it is for a Mapper or a Reducer.
+   *
+   * @param isMap TRUE for Mapper, FALSE for Reducer.
+   * @return the prefix to use.
+   */
+  private static String getPrefix(boolean isMap) {
+    return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+  }
+
+  /**
+   * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
+   * <p/>
+   * It creates a new JobConf using the chain job's JobConf as base and adds to
+   * it the configuration properties for the chain element. The keys of the
+   * chain element jobConf have precedence over the given JobConf.
+   *
+   * @param jobConf the chain job's JobConf.
+   * @param confKey the key for chain element configuration serialized in the
+   *                chain job's JobConf.
+   * @return a new JobConf aggregating the chain job's JobConf with the chain
+   *         element configuration properties.
+   */
+  private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
+    JobConf conf;
+    try {
+      Stringifier<WritableJobConf> stringifier =
+        new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+      conf = stringifier.fromString(jobConf.get(confKey, null));
+    } catch (IOException ioex) {
+      throw new RuntimeException(ioex);
+    }
+    // we have to do this because the Writable desearialization clears all
+    // values set in the conf making not possible do do a new JobConf(jobConf)
+    // in the creation of the conf above
+    jobConf = new JobConf(jobConf);
+
+    for(Map.Entry<String, String> entry : conf) {
+      jobConf.set(entry.getKey(), entry.getValue());
+    }
+    return jobConf;
+  }
+
+  /**
+   * Adds a Mapper class to the chain job's JobConf.
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Mapper.
+   *
+   * @param isMap            indicates if the Chain is for a Mapper or for a
+   * Reducer.
+   * @param jobConf              chain job's JobConf to add the Mapper class.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param byValue          indicates if key/values should be passed by value
+   * to the next Mapper in the chain, if any.
+   * @param mapperConf       a JobConf with the configuration for the Mapper
+   * class. It is recommended to use a JobConf without default values using the
+   * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
+                           Class<? extends Mapper<K1, V1, K2, V2>> klass,
+                           Class<? extends K1> inputKeyClass,
+                           Class<? extends V1> inputValueClass,
+                           Class<? extends K2> outputKeyClass,
+                           Class<? extends V2> outputValueClass,
+                           boolean byValue, JobConf mapperConf) {
+    String prefix = getPrefix(isMap);
+
+    // if a reducer chain check the Reducer has been already set
+    if (!isMap) {
+      if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
+                           Reducer.class) == null) {
+        throw new IllegalStateException(
+          "A Mapper can be added to the chain only after the Reducer has " +
+          "been set");
+      }
+    }
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
+
+    // if it is a reducer chain and the first Mapper is being added check the
+    // key and value input classes of the mapper match those of the reducer
+    // output.
+    if (!isMap && index == 0) {
+      JobConf reducerConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      if (! inputKeyClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output key class does" +
+          " not match the Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Reducer output value class" +
+          " does not match the Mapper input value class");
+      }
+    } else if (index > 0) {
+      // check the that the new Mapper in the chain key and value input classes
+      // match those of the previous Mapper output.
+      JobConf previousMapperConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+          (index - 1));
+      if (! inputKeyClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output key class does" +
+          " not match the previous Mapper input key class");
+      }
+      if (! inputValueClass.isAssignableFrom(
+        previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+        throw new IllegalArgumentException("The Mapper output value class" +
+          " does not match the previous Mapper input value class");
+      }
+    }
+
+    // if the Mapper does not have a private JobConf create an empty one
+    if (mapperConf == null) {
+      // using a JobConf without defaults to make it lightweight.
+      // still the chain JobConf may have all defaults and this conf is
+      // overlapped to the chain JobConf one.
+      mapperConf = new JobConf(true);
+    }
+
+    // store in the private mapper conf the input/output classes of the mapper
+    // and if it works by value or by reference
+    mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
+    mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+                        Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+    mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+                        Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<WritableJobConf> stringifier =
+      new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+                  stringifier.toString(new WritableJobConf(mapperConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+
+    // increment the chain counter
+    jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+  }
+
+  /**
+   * Sets the Reducer class to the chain job's JobConf.
+   * <p/>
+   * The configuration properties of the chain job have precedence over the
+   * configuration properties of the Reducer.
+   *
+   * @param jobConf              chain job's JobConf to add the Reducer class.
+   * @param klass            the Reducer class to add.
+   * @param inputKeyClass    reducer input key class.
+   * @param inputValueClass  reducer input value class.
+   * @param outputKeyClass   reducer output key class.
+   * @param outputValueClass reducer output value class.
+   * @param byValue          indicates if key/values should be passed by value
+   * to the next Mapper in the chain, if any.
+   * @param reducerConf      a JobConf with the configuration for the Reducer
+   * class. It is recommended to use a JobConf without default values using the
+   * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
+                          Class<? extends Reducer<K1, V1, K2, V2>> klass,
+                          Class<? extends K1> inputKeyClass,
+                          Class<? extends V1> inputValueClass,
+                          Class<? extends K2> outputKeyClass,
+                          Class<? extends V2> outputValueClass,
+                          boolean byValue, JobConf reducerConf) {
+    String prefix = getPrefix(false);
+
+    if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+      throw new IllegalStateException("Reducer has been already set");
+    }
+
+    jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
+
+    // if the Reducer does not have a private JobConf create an empty one
+    if (reducerConf == null) {
+      // using a JobConf without defaults to make it lightweight.
+      // still the chain JobConf may have all defaults and this conf is
+      // overlapped to the chain JobConf one.
+      reducerConf = new JobConf(false);
+    }
+
+    // store in the private reducer conf the input/output classes of the reducer
+    // and if it works by value or by reference
+    reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
+    reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+    reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+                         Object.class);
+    reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+                         Object.class);
+
+    // serialize the private mapper jobconf in the chain jobconf.
+    Stringifier<WritableJobConf> stringifier =
+      new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+    try {
+      jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+                  stringifier.toString(new WritableJobConf(reducerConf)));
+    }
+    catch (IOException ioEx) {
+      throw new RuntimeException(ioEx);
+    }
+  }
+
+  /**
+   * Configures all the chain elements for the task.
+   *
+   * @param jobConf chain job's JobConf.
+   */
+  public void configure(JobConf jobConf) {
+    String prefix = getPrefix(isMap);
+    chainJobConf = jobConf;
+    SerializationFactory serializationFactory =
+      new SerializationFactory(chainJobConf);
+    int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+    for (int i = 0; i < index; i++) {
+      Class<? extends Mapper> klass =
+        jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
+      JobConf mConf =
+        getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+      Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
+      mappers.add(mapper);
+
+      if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
+        mappersKeySerialization.add(serializationFactory.getSerialization(
+          mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
+        mappersValueSerialization.add(serializationFactory.getSerialization(
+          mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
+      } else {
+        mappersKeySerialization.add(null);
+        mappersValueSerialization.add(null);
+      }
+    }
+    Class<? extends Reducer> klass =
+      jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
+    if (klass != null) {
+      JobConf rConf =
+        getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+      reducer = ReflectionUtils.newInstance(klass, rConf);
+      if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
+        reducerKeySerialization = serializationFactory
+          .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
+        reducerValueSerialization = serializationFactory
+          .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
+      } else {
+        reducerKeySerialization = null;
+        reducerValueSerialization = null;
+      }
+    }
+  }
+
+  /**
+   * Returns the chain job conf.
+   *
+   * @return the chain job conf.
+   */
+  protected JobConf getChainJobConf() {
+    return chainJobConf;
+  }
+
+  /**
+   * Returns the first Mapper instance in the chain.
+   *
+   * @return the first Mapper instance in the chain or NULL if none.
+   */
+  public Mapper getFirstMap() {
+    return (mappers.size() > 0) ? mappers.get(0) : null;
+  }
+
+  /**
+   * Returns the Reducer instance in the chain.
+   *
+   * @return the Reducer instance in the chain or NULL if none.
+   */
+  public Reducer getReducer() {
+    return reducer;
+  }
+
+  /**
+   * Returns the OutputCollector to be used by a Mapper instance in the chain.
+   *
+   * @param mapperIndex index of the Mapper instance to get the OutputCollector.
+   * @param output      the original OutputCollector of the task.
+   * @param reporter    the reporter of the task.
+   * @return the OutputCollector to be used in the chain.
+   */
+  @SuppressWarnings({"unchecked"})
+  public OutputCollector getMapperCollector(int mapperIndex,
+                                            OutputCollector output,
+                                            Reporter reporter) {
+    Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
+    Serialization valueSerialization =
+      mappersValueSerialization.get(mapperIndex);
+    return new ChainOutputCollector(mapperIndex, keySerialization,
+                                    valueSerialization, output, reporter);
+  }
+
+  /**
+   * Returns the OutputCollector to be used by a Mapper instance in the chain.
+   *
+   * @param output   the original OutputCollector of the task.
+   * @param reporter the reporter of the task.
+   * @return the OutputCollector to be used in the chain.
+   */
+  @SuppressWarnings({"unchecked"})
+  public OutputCollector getReducerCollector(OutputCollector output,
+                                             Reporter reporter) {
+    return new ChainOutputCollector(reducerKeySerialization,
+                                    reducerValueSerialization, output,
+                                    reporter);
+  }
+
+  /**
+   * Closes all the chain elements.
+   *
+   * @throws IOException thrown if any of the chain elements threw an
+   *                     IOException exception.
+   */
+  public void close() throws IOException {
+    for (Mapper map : mappers) {
+      map.close();
+    }
+    if (reducer != null) {
+      reducer.close();
+    }
+  }
+
+  // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
+  // it has to be a thread local because if not it would break if used from a
+  // MultiThreadedMapRunner.
+  private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
+    new ThreadLocal<DataOutputBuffer>() {
+      protected DataOutputBuffer initialValue() {
+        return new DataOutputBuffer(1024);
+      }
+    };
+
+  /**
+   * OutputCollector implementation used by the chain tasks.
+   * <p/>
+   * If it is not the end of the chain, a {@link #collect} invocation invokes
+   * the next Mapper in the chain. If it is the end of the chain the task
+   * OutputCollector is called.
+   */
+  private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
+    private int nextMapperIndex;
+    private Serialization<K> keySerialization;
+    private Serialization<V> valueSerialization;
+    private OutputCollector output;
+    private Reporter reporter;
+
+    /*
+     * Constructor for Mappers
+     */
+    public ChainOutputCollector(int index, Serialization<K> keySerialization,
+                                Serialization<V> valueSerialization,
+                                OutputCollector output, Reporter reporter) {
+      this.nextMapperIndex = index + 1;
+      this.keySerialization = keySerialization;
+      this.valueSerialization = valueSerialization;
+      this.output = output;
+      this.reporter = reporter;
+    }
+
+    /*
+     * Constructor for Reducer
+     */
+    public ChainOutputCollector(Serialization<K> keySerialization,
+                                Serialization<V> valueSerialization,
+                                OutputCollector output, Reporter reporter) {
+      this.nextMapperIndex = 0;
+      this.keySerialization = keySerialization;
+      this.valueSerialization = valueSerialization;
+      this.output = output;
+      this.reporter = reporter;
+    }
+
+    @SuppressWarnings({"unchecked"})
+    public void collect(K key, V value) throws IOException {
+      if (nextMapperIndex < mappers.size()) {
+        // there is a next mapper in chain
+
+        // only need to ser/deser if there is next mapper in the chain
+        if (keySerialization != null) {
+          key = makeCopyForPassByValue(keySerialization, key);
+          value = makeCopyForPassByValue(valueSerialization, value);
+        }
+
+        // gets ser/deser and mapper of next in chain
+        Serialization nextKeySerialization =
+          mappersKeySerialization.get(nextMapperIndex);
+        Serialization nextValueSerialization =
+          mappersValueSerialization.get(nextMapperIndex);
+        Mapper nextMapper = mappers.get(nextMapperIndex);
+
+        // invokes next mapper in chain
+        nextMapper.map(key, value,
+                       new ChainOutputCollector(nextMapperIndex,
+                                                nextKeySerialization,
+                                                nextValueSerialization,
+                                                output, reporter),
+                       reporter);
+      } else {
+        // end of chain, user real output collector
+        output.collect(key, value);
+      }
+    }
+
+    private <E> E makeCopyForPassByValue(Serialization<E> serialization,
+                                          E obj) throws IOException {
+      Serializer<E> ser =
+        serialization.getSerializer(GenericsUtil.getClass(obj));
+      Deserializer<E> deser =
+        serialization.getDeserializer(GenericsUtil.getClass(obj));
+
+      DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
+
+      dof.reset();
+      ser.open(dof);
+      ser.serialize(obj);
+      ser.close();
+      obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
+                                        getChainJobConf());
+      ByteArrayInputStream bais =
+        new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
+      deser.open(bais);
+      deser.deserialize(obj);
+      deser.close();
+      return obj;
+    }
+
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * The ChainMapper class allows to use multiple Mapper classes within a single
+ * Map task.
+ * <p/>
+ * The Mapper classes are invoked in a chained (or piped) fashion, the output of
+ * the first becomes the input of the second, and so on until the last Mapper,
+ * the output of the last Mapper will be written to the task's output.
+ * <p/>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed in a chain. This enables having
+ * reusable specialized Mappers that can be combined to perform composite
+ * operations within a single task.
+ * <p/>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use maching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * <p/>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain.
+ * <p/>
+ * ChainMapper usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * conf.setJobName("chain");
+ * conf.setInputFormat(TextInputFormat.class);
+ * conf.setOutputFormat(TextOutputFormat.class);
+ * <p/>
+ * JobConf mapAConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * JobConf mapBConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * JobConf reduceConf = new JobConf(false);
+ * ...
+ * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+ *   LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * FileInputFormat.setInputPaths(conf, inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ * ...
+ * <p/>
+ * JobClient jc = new JobClient(conf);
+ * RunningJob job = jc.submitJob(conf);
+ * ...
+ * </pre>
+ */
+public class ChainMapper implements Mapper {
+
+  /**
+   * Adds a Mapper class to the chain job's JobConf.
+   * <p/>
+   * It has to be specified how key and values are passed from one element of
+   * the chain to the next, by value or by reference. If a Mapper leverages the
+   * assumed semantics that the key and values are not modified by the collector
+   * 'by value' must be used. If the Mapper does not expect this semantics, as
+   * an optimization to avoid serialization and deserialization 'by reference'
+   * can be used.
+   * <p/>
+   * For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's JobConf. This
+   * precedence is in effect when the task is running.
+   * <p/>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainMapper, this is done by the addMapper for the last mapper in the chain
+   * <p/>
+   *
+   * @param job              job's JobConf to add the Mapper class.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param byValue          indicates if key/values should be passed by value
+   * to the next Mapper in the chain, if any.
+   * @param mapperConf       a JobConf with the configuration for the Mapper
+   * class. It is recommended to use a JobConf without default values using the
+   * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  public static <K1, V1, K2, V2> void addMapper(JobConf job,
+                           Class<? extends Mapper<K1, V1, K2, V2>> klass,
+                           Class<? extends K1> inputKeyClass,
+                           Class<? extends V1> inputValueClass,
+                           Class<? extends K2> outputKeyClass,
+                           Class<? extends V2> outputValueClass,
+                           boolean byValue, JobConf mapperConf) {
+    job.setMapperClass(ChainMapper.class);
+    job.setMapOutputKeyClass(outputKeyClass);
+    job.setMapOutputValueClass(outputValueClass);
+    Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
+                    outputKeyClass, outputValueClass, byValue, mapperConf);
+  }
+
+  private Chain chain;
+
+  /**
+   * Constructor.
+   */
+  public ChainMapper() {
+    chain = new Chain(true);
+  }
+
+  /**
+   * Configures the ChainMapper and all the Mappers in the chain.
+   * <p/>
+   * If this method is overriden <code>super.configure(...)</code> should be
+   * invoked at the beginning of the overwriter method.
+   */
+  public void configure(JobConf job) {
+    chain.configure(job);
+  }
+
+  /**
+   * Chains the <code>map(...)</code> methods of the Mappers in the chain.
+   */
+  @SuppressWarnings({"unchecked"})
+  public void map(Object key, Object value, OutputCollector output,
+                  Reporter reporter) throws IOException {
+    Mapper mapper = chain.getFirstMap();
+    if (mapper != null) {
+      mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
+                 reporter);
+    }
+  }
+
+  /**
+   * Closes  the ChainMapper and all the Mappers in the chain.
+   * <p/>
+   * If this method is overriden <code>super.close()</code> should be
+   * invoked at the end of the overwriter method.
+   */
+  public void close() throws IOException {
+    chain.close();
+  }
+
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * The ChainReducer class allows to chain multiple Mapper classes after a
+ * Reducer within the Reducer task.
+ * <p/>
+ * For each record output by the Reducer, the Mapper classes are invoked in a
+ * chained (or piped) fashion, the output of the first becomes the input of the
+ * second, and so on until the last Mapper, the output of the last Mapper will
+ * be written to the task's output.
+ * <p/>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed after the Reducer or in a chain.
+ * This enables having reusable specialized Mappers that can be combined to
+ * perform composite operations within a single task.
+ * <p/>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use maching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * <p/>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainReducer, this is done by the setReducer or the addMapper for the last
+ * element in the chain.
+ * <p/>
+ * ChainReducer usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * conf.setJobName("chain");
+ * conf.setInputFormat(TextInputFormat.class);
+ * conf.setOutputFormat(TextOutputFormat.class);
+ * <p/>
+ * JobConf mapAConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * JobConf mapBConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * JobConf reduceConf = new JobConf(false);
+ * ...
+ * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
+ *   Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
+ *   LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+ *   LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * FileInputFormat.setInputPaths(conf, inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ * ...
+ * <p/>
+ * JobClient jc = new JobClient(conf);
+ * RunningJob job = jc.submitJob(conf);
+ * ...
+ * </pre>
+ */
+public class ChainReducer implements Reducer {
+
+  /**
+   * Sets the Reducer class to the chain job's JobConf.
+   * <p/>
+   * It has to be specified how key and values are passed from one element of
+   * the chain to the next, by value or by reference. If a Reducer leverages the
+   * assumed semantics that the key and values are not modified by the collector
+   * 'by value' must be used. If the Reducer does not expect this semantics, as
+   * an optimization to avoid serialization and deserialization 'by reference'
+   * can be used.
+   * <p/>
+   * For the added Reducer the configuration given for it,
+   * <code>reducerConf</code>, have precedence over the job's JobConf. This
+   * precedence is in effect when the task is running.
+   * <p/>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainReducer, this is done by the setReducer or the addMapper for the last
+   * element in the chain.
+   *
+   * @param job              job's JobConf to add the Reducer class.
+   * @param klass            the Reducer class to add.
+   * @param inputKeyClass    reducer input key class.
+   * @param inputValueClass  reducer input value class.
+   * @param outputKeyClass   reducer output key class.
+   * @param outputValueClass reducer output value class.
+   * @param byValue          indicates if key/values should be passed by value
+   * to the next Mapper in the chain, if any.
+   * @param reducerConf      a JobConf with the configuration for the Reducer
+   * class. It is recommended to use a JobConf without default values using the
+   * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  public static <K1, V1, K2, V2> void setReducer(JobConf job,
+                           Class<? extends Reducer<K1, V1, K2, V2>> klass,
+                           Class<? extends K1> inputKeyClass,
+                           Class<? extends V1> inputValueClass,
+                           Class<? extends K2> outputKeyClass,
+                           Class<? extends V2> outputValueClass,
+                           boolean byValue, JobConf reducerConf) {
+    job.setReducerClass(ChainReducer.class);
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.setReducer(job, klass, inputKeyClass, inputValueClass, outputKeyClass,
+                     outputValueClass, byValue, reducerConf);
+  }
+
+  /**
+   * Adds a Mapper class to the chain job's JobConf.
+   * <p/>
+   * It has to be specified how key and values are passed from one element of
+   * the chain to the next, by value or by reference. If a Mapper leverages the
+   * assumed semantics that the key and values are not modified by the collector
+   * 'by value' must be used. If the Mapper does not expect this semantics, as
+   * an optimization to avoid serialization and deserialization 'by reference'
+   * can be used.
+   * <p/>
+   * For the added Mapper the configuration given for it,
+   * <code>mapperConf</code>, have precedence over the job's JobConf. This
+   * precedence is in effect when the task is running.
+   * <p/>
+   * IMPORTANT: There is no need to specify the output key/value classes for the
+   * ChainMapper, this is done by the addMapper for the last mapper in the chain
+   * .
+   *
+   * @param job              chain job's JobConf to add the Mapper class.
+   * @param klass            the Mapper class to add.
+   * @param inputKeyClass    mapper input key class.
+   * @param inputValueClass  mapper input value class.
+   * @param outputKeyClass   mapper output key class.
+   * @param outputValueClass mapper output value class.
+   * @param byValue          indicates if key/values should be passed by value
+   * to the next Mapper in the chain, if any.
+   * @param mapperConf       a JobConf with the configuration for the Mapper
+   * class. It is recommended to use a JobConf without default values using the
+   * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+   */
+  public static <K1, V1, K2, V2> void addMapper(JobConf job,
+                           Class<? extends Mapper<K1, V1, K2, V2>> klass,
+                           Class<? extends K1> inputKeyClass,
+                           Class<? extends V1> inputValueClass,
+                           Class<? extends K2> outputKeyClass,
+                           Class<? extends V2> outputValueClass,
+                           boolean byValue, JobConf mapperConf) {
+    job.setOutputKeyClass(outputKeyClass);
+    job.setOutputValueClass(outputValueClass);
+    Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
+                    outputKeyClass, outputValueClass, byValue, mapperConf);
+  }
+
+  private Chain chain;
+
+  /**
+   * Constructor.
+   */
+  public ChainReducer() {
+    chain = new Chain(false);
+  }
+
+  /**
+   * Configures the ChainReducer, the Reducer and all the Mappers in the chain.
+   * <p/>
+   * If this method is overriden <code>super.configure(...)</code> should be
+   * invoked at the beginning of the overwriter method.
+   */
+  public void configure(JobConf job) {
+    chain.configure(job);
+  }
+
+  /**
+   * Chains the <code>reduce(...)</code> method of the Reducer with the
+   * <code>map(...) </code> methods of the Mappers in the chain.
+   */
+  @SuppressWarnings({"unchecked"})
+  public void reduce(Object key, Iterator values, OutputCollector output,
+                     Reporter reporter) throws IOException {
+    Reducer reducer = chain.getReducer();
+    if (reducer != null) {
+      reducer.reduce(key, values, chain.getReducerCollector(output, reporter),
+                     reporter);
+    }
+  }
+
+  /**
+   * Closes  the ChainReducer, the Reducer and all the Mappers in the chain.
+   * <p/>
+   * If this method is overriden <code>super.close()</code> should be
+   * invoked at the end of the overwriter method.
+   */
+  public void close() throws IOException {
+    chain.close();
+  }
+
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java Tue Sep  9 06:18:43 2008
@@ -21,6 +21,10 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.Random;
 
@@ -333,7 +337,23 @@
     assertEquals(null, conf.get("test.key2"));
     assertEquals("value5", conf.get("test.key4"));
   }
-  
+
+  public void testSize() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.set("a", "A");
+    conf.set("b", "B");
+    assertEquals(2, conf.size());
+  }
+
+  public void testClear() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.set("a", "A");
+    conf.set("b", "B");
+    conf.clear();
+    assertEquals(0, conf.size());
+    assertFalse(conf.iterator().hasNext());
+  }
+
   public static void main(String[] argv) throws Exception {
     junit.textui.TestRunner.main(new String[]{
       TestConfiguration.class.getName()

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.GenericsUtil;
+
+public class TestWritableJobConf extends TestCase {
+
+  private static final Configuration CONF = new Configuration();
+
+  private <K> K serDeser(K conf) throws Exception {
+    SerializationFactory factory = new SerializationFactory(CONF);
+    Serializer<K> serializer =
+      factory.getSerializer(GenericsUtil.getClass(conf));
+    Deserializer<K> deserializer =
+      factory.getDeserializer(GenericsUtil.getClass(conf));
+
+    DataOutputBuffer out = new DataOutputBuffer();
+    serializer.open(out);
+    serializer.serialize(conf);
+    serializer.close();
+
+    DataInputBuffer in = new DataInputBuffer();
+    in.reset(out.getData(), out.getLength());
+    deserializer.open(in);
+    K after = deserializer.deserialize(null);
+    deserializer.close();
+    return after;
+  }
+
+  private void assertEquals(Configuration conf1, Configuration conf2) {
+    assertEquals(conf1.size(), conf2.size());
+
+    Iterator<Map.Entry<String, String>> iterator1 = conf1.iterator();
+    Map<String, String> map1 = new HashMap<String,String>();
+    while (iterator1.hasNext()) {
+      Map.Entry<String, String> entry = iterator1.next();
+      map1.put(entry.getKey(), entry.getValue());
+    }
+
+    Iterator<Map.Entry<String, String>> iterator2 = conf1.iterator();
+    Map<String, String> map2 = new HashMap<String,String>();
+    while (iterator2.hasNext()) {
+      Map.Entry<String, String> entry = iterator2.next();
+      map2.put(entry.getKey(), entry.getValue());
+    }
+
+    assertEquals(map1, map2);
+  }
+
+  public void testEmptyConfiguration() throws Exception {
+    WritableJobConf conf = new WritableJobConf();
+    Configuration deser = serDeser(conf);
+    assertEquals(conf, deser);
+  }
+
+  public void testNonEmptyConfiguration() throws Exception {
+    WritableJobConf conf = new WritableJobConf();
+    conf.set("a", "A");
+    conf.set("b", "B");
+    Configuration deser = serDeser(conf);
+    assertEquals(conf, deser);
+  }
+
+  public void testConfigurationWithDefaults() throws Exception {
+    WritableJobConf conf = new WritableJobConf(false);
+    conf.set("a", "A");
+    conf.set("b", "B");
+    Configuration deser = serDeser(conf);
+    assertEquals(conf, deser);
+  }
+  
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java Tue Sep  9 06:18:43 2008
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TestChainMapReduce extends HadoopTestCase {
+
+  private static Path getFlagDir(boolean local) {
+    Path flagDir = new Path("testing/chain/flags");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (local) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+        .replace(' ', '+');
+      flagDir = new Path(localPathRoot, flagDir);
+    }
+    return flagDir;
+  }
+
+  private static void cleanFlags(JobConf conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(getFlagDir(conf.getBoolean("localFS", true)), true);
+    fs.mkdirs(getFlagDir(conf.getBoolean("localFS", true)));
+  }
+
+  private static void writeFlag(JobConf conf, String flag) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    if (getFlag(conf, flag)) {
+      fail("Flag " + flag + " already exists");
+    }
+    DataOutputStream file =
+      fs.create(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
+    file.close();
+  }
+
+  private static boolean getFlag(JobConf conf, String flag) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    return fs
+      .exists(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
+  }
+
+  public TestChainMapReduce() throws IOException {
+    super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+  }
+
+  public void testChain() throws Exception {
+    Path inDir = new Path("testing/chain/input");
+    Path outDir = new Path("testing/chain/output");
+
+    // Hack for local FS that does not have the concept of a 'mounting point'
+    if (isLocalFS()) {
+      String localPathRoot = System.getProperty("test.build.data", "/tmp")
+        .replace(' ', '+');
+      inDir = new Path(localPathRoot, inDir);
+      outDir = new Path(localPathRoot, outDir);
+    }
+
+
+    JobConf conf = createJobConf();
+    conf.setBoolean("localFS", isLocalFS());
+
+    cleanFlags(conf);
+
+    FileSystem fs = FileSystem.get(conf);
+
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+
+    DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+    file.writeBytes("1\n2\n");
+    file.close();
+
+    conf.setJobName("chain");
+    conf.setInputFormat(TextInputFormat.class);
+    conf.setOutputFormat(TextOutputFormat.class);
+
+    conf.set("a", "X");
+
+    JobConf mapAConf = new JobConf(false);
+    mapAConf.set("a", "A");
+    ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+                          LongWritable.class, Text.class, true, mapAConf);
+
+    ChainMapper.addMapper(conf, BMap.class, LongWritable.class, Text.class,
+                          LongWritable.class, Text.class, false, null);
+
+    JobConf reduceConf = new JobConf(false);
+    reduceConf.set("a", "C");
+    ChainReducer.setReducer(conf, CReduce.class, LongWritable.class, Text.class,
+                            LongWritable.class, Text.class, true, reduceConf);
+
+    ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+                           LongWritable.class, Text.class, false, null);
+
+    JobConf mapEConf = new JobConf(false);
+    mapEConf.set("a", "E");
+    ChainReducer.addMapper(conf, EMap.class, LongWritable.class, Text.class,
+                           LongWritable.class, Text.class, true, mapEConf);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+
+    JobClient jc = new JobClient(conf);
+    RunningJob job = jc.submitJob(conf);
+    while (!job.isComplete()) {
+      Thread.sleep(100);
+    }
+
+    assertTrue(getFlag(conf, "configure.A"));
+    assertTrue(getFlag(conf, "configure.B"));
+    assertTrue(getFlag(conf, "configure.C"));
+    assertTrue(getFlag(conf, "configure.D"));
+    assertTrue(getFlag(conf, "configure.E"));
+
+    assertTrue(getFlag(conf, "map.A.value.1"));
+    assertTrue(getFlag(conf, "map.A.value.2"));
+    assertTrue(getFlag(conf, "map.B.value.1"));
+    assertTrue(getFlag(conf, "map.B.value.2"));
+    assertTrue(getFlag(conf, "reduce.C.value.2"));
+    assertTrue(getFlag(conf, "reduce.C.value.1"));
+    assertTrue(getFlag(conf, "map.D.value.1"));
+    assertTrue(getFlag(conf, "map.D.value.2"));
+    assertTrue(getFlag(conf, "map.E.value.1"));
+    assertTrue(getFlag(conf, "map.E.value.2"));
+
+    assertTrue(getFlag(conf, "close.A"));
+    assertTrue(getFlag(conf, "close.B"));
+    assertTrue(getFlag(conf, "close.C"));
+    assertTrue(getFlag(conf, "close.D"));
+    assertTrue(getFlag(conf, "close.E"));
+  }
+
+  public static class AMap extends IDMap {
+    public AMap() {
+      super("A", "A", true);
+    }
+  }
+
+  public static class BMap extends IDMap {
+    public BMap() {
+      super("B", "X", false);
+    }
+  }
+
+  public static class CReduce extends IDReduce {
+    public CReduce() {
+      super("C", "C");
+    }
+  }
+
+  public static class DMap extends IDMap {
+    public DMap() {
+      super("D", "X", false);
+    }
+  }
+
+  public static class EMap extends IDMap {
+    public EMap() {
+      super("E", "E", true);
+    }
+  }
+
+  public static class IDMap
+    implements Mapper<LongWritable, Text, LongWritable, Text> {
+    private JobConf conf;
+    private String name;
+    private String prop;
+    private boolean byValue;
+
+    public IDMap(String name, String prop, boolean byValue) {
+      this.name = name;
+      this.prop = prop;
+      this.byValue = byValue;
+    }
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      assertEquals(prop, conf.get("a"));
+      try {
+        writeFlag(conf, "configure." + name);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    public void map(LongWritable key, Text value,
+                    OutputCollector<LongWritable, Text> output,
+                    Reporter reporter) throws IOException {
+      writeFlag(conf, "map." + name + ".value." + value);
+      key.set(10);
+      output.collect(key, value);
+      if (byValue) {
+        assertEquals(10, key.get());
+      } else {
+        assertNotSame(10, key.get());
+      }
+      key.set(11);
+    }
+
+    public void close() throws IOException {
+      try {
+        writeFlag(conf, "close." + name);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+  public static class IDReduce
+    implements Reducer<LongWritable, Text, LongWritable, Text> {
+
+    private JobConf conf;
+    private String name;
+    private String prop;
+    private boolean byValue = false;
+
+    public IDReduce(String name, String prop) {
+      this.name = name;
+      this.prop = prop;
+    }
+
+    public void configure(JobConf conf) {
+      this.conf = conf;
+      assertEquals(prop, conf.get("a"));
+      try {
+        writeFlag(conf, "configure." + name);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    public void reduce(LongWritable key, Iterator<Text> values,
+                       OutputCollector<LongWritable, Text> output,
+                       Reporter reporter) throws IOException {
+      while (values.hasNext()) {
+        Text value = values.next();
+        writeFlag(conf, "reduce." + name + ".value." + value);
+        key.set(10);
+        output.collect(key, value);
+        if (byValue) {
+          assertEquals(10, key.get());
+        } else {
+          assertNotSame(10, key.get());
+        }
+        key.set(11);
+      }
+    }
+
+    public void close() throws IOException {
+      try {
+        writeFlag(conf, "close." + name);
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+
+}