You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/09 15:18:44 UTC
svn commit: r693460 - in /hadoop/core/trunk: ./
src/core/org/apache/hadoop/conf/ src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapred/lib/ src/test/org/apache/hadoop/conf/
src/test/org/apache/hadoop/mapred/ src/test/org/apache/hadoo...
Author: ddas
Date: Tue Sep 9 06:18:43 2008
New Revision: 693460
URL: http://svn.apache.org/viewvc?rev=693460&view=rev
Log:
HADOOP-3702. Adds ChainMapper and ChainReducer classes allow composing chains of Maps and Reduces in a single Map/Reduce job, something like MAP+ / REDUCE MAP*. Contributed by Alejandro Abdelnur.
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 9 06:18:43 2008
@@ -141,6 +141,10 @@
HADOOP-3970. Provides a way to recover counters written to JobHistory.
(Amar Kamat via ddas)
+ HADOOP-3702. Adds ChainMapper and ChainReducer classes allow composing
+ chains of Maps and Reduces in a single Map/Reduce job, something like
+ MAP+ / REDUCE MAP*. (Alejandro Abdelnur via ddas)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Tue Sep 9 06:18:43 2008
@@ -28,6 +28,7 @@
import java.io.Reader;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,10 +41,10 @@
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.Collection;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
@@ -54,11 +55,13 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
+import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
/**
* Provides access to configuration parameters.
@@ -234,6 +237,19 @@
}
/**
+ * Add a configuration resource.
+ *
+ * The properties of this resource will override properties of previously
+ * added resources, unless they were marked <a href="#Final">final</a>.
+ *
+ * @param in InputStream to deserialize the object from.
+ */
+ public void addResource(InputStream in) {
+ addResourceObject(in);
+ }
+
+
+ /**
* Reload configuration from previously added resources.
*
* This method will clear all the configuration read from the added
@@ -554,6 +570,7 @@
return false;
}
+ @Override
public String toString() {
StringBuffer result = new StringBuffer();
boolean first = true;
@@ -874,6 +891,23 @@
}
/**
+ * Return the number of keys in the configuration.
+ *
+ * @return number of keys in the configuration.
+ */
+ public int size() {
+ return getProps().size();
+ }
+
+ /**
+ * Clears all keys from the configuration.
+ */
+ public void clear() {
+ getProps().clear();
+ getOverlay().clear();
+ }
+
+ /**
* Get an {@link Iterator} to go through the list of <code>String</code>
* key-value pairs in the configuration.
*
@@ -944,6 +978,12 @@
in.close();
}
}
+ } else if (name instanceof InputStream) {
+ try {
+ doc = builder.parse((InputStream)name);
+ } finally {
+ ((InputStream)name).close();
+ }
}
if (doc == null) {
@@ -993,11 +1033,19 @@
}
}
- } catch (Exception e) {
+ } catch (IOException e) {
+ LOG.fatal("error parsing conf file: " + e);
+ throw new RuntimeException(e);
+ } catch (DOMException e) {
+ LOG.fatal("error parsing conf file: " + e);
+ throw new RuntimeException(e);
+ } catch (SAXException e) {
+ LOG.fatal("error parsing conf file: " + e);
+ throw new RuntimeException(e);
+ } catch (ParserConfigurationException e) {
LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
}
-
}
/**
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/WritableJobConf.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Writable JobConf is the Writable version of the JobConf.
+ */
+public class WritableJobConf extends JobConf implements Writable {
+
+ /**
+ * This C.tor does not load default configuration files,
+ * hadoop-(default|site).xml
+ */
+ public WritableJobConf() {
+ super(false); //do not load defaults
+ }
+
+ public WritableJobConf(boolean loadDefaults) {
+ super(loadDefaults);
+ }
+
+ public WritableJobConf(Class<?> exampleClass) {
+ super(exampleClass);
+ }
+
+ public WritableJobConf(Configuration conf, Class<?> exampleClass) {
+ super(conf, exampleClass);
+ }
+
+ public WritableJobConf(Configuration conf) {
+ super(conf);
+ }
+
+ public WritableJobConf(Path config) {
+ super(config);
+ }
+
+ public WritableJobConf(String config) {
+ super(config);
+ }
+
+ @Override
+ public void readFields(final DataInput in) throws IOException {
+ if(in instanceof InputStream) {
+ this.addResource((InputStream)in);
+ } else {
+ this.addResource(new InputStream() {
+ @Override
+ public int read() throws IOException {
+ return in.readByte();
+ }
+ });
+ }
+
+ this.get("foo"); //so that getProps() is called, before returining back.
+ }
+
+ @Override
+ public void write(final DataOutput out) throws IOException {
+ if(out instanceof OutputStream) {
+ write((OutputStream)out);
+ }
+ else {
+ write(new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ out.writeByte(b);
+ }
+ });
+ }
+ }
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/Chain.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,543 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.Serialization;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.GenericsUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * The Chain class provides all the common functionality for the
+ * {@link ChainMapper} and the {@link ChainReducer} classes.
+ */
+class Chain {
+ private static final String CHAIN_MAPPER = "chain.mapper";
+ private static final String CHAIN_REDUCER = "chain.reducer";
+
+ private static final String CHAIN_MAPPER_SIZE = ".size";
+ private static final String CHAIN_MAPPER_CLASS = ".mapper.class.";
+ private static final String CHAIN_MAPPER_CONFIG = ".mapper.config.";
+ private static final String CHAIN_REDUCER_CLASS = ".reducer.class";
+ private static final String CHAIN_REDUCER_CONFIG = ".reducer.config";
+
+ private static final String MAPPER_BY_VALUE = "chain.mapper.byValue";
+ private static final String REDUCER_BY_VALUE = "chain.reducer.byValue";
+
+ private static final String MAPPER_INPUT_KEY_CLASS =
+ "chain.mapper.input.key.class";
+ private static final String MAPPER_INPUT_VALUE_CLASS =
+ "chain.mapper.input.value.class";
+ private static final String MAPPER_OUTPUT_KEY_CLASS =
+ "chain.mapper.output.key.class";
+ private static final String MAPPER_OUTPUT_VALUE_CLASS =
+ "chain.mapper.output.value.class";
+ private static final String REDUCER_INPUT_KEY_CLASS =
+ "chain.reducer.input.key.class";
+ private static final String REDUCER_INPUT_VALUE_CLASS =
+ "chain.reducer.input.value.class";
+ private static final String REDUCER_OUTPUT_KEY_CLASS =
+ "chain.reducer.output.key.class";
+ private static final String REDUCER_OUTPUT_VALUE_CLASS =
+ "chain.reducer.output.value.class";
+
+ private boolean isMap;
+
+ private JobConf chainJobConf;
+
+ private List<Mapper> mappers = new ArrayList<Mapper>();
+ private Reducer reducer;
+
+ // to cache the key/value output class serializations for each chain element
+ // to avoid everytime lookup.
+ private List<Serialization> mappersKeySerialization =
+ new ArrayList<Serialization>();
+ private List<Serialization> mappersValueSerialization =
+ new ArrayList<Serialization>();
+ private Serialization reducerKeySerialization;
+ private Serialization reducerValueSerialization;
+
+ /**
+ * Creates a Chain instance configured for a Mapper or a Reducer.
+ *
+ * @param isMap TRUE indicates the chain is for a Mapper, FALSE that is for a
+ * Reducer.
+ */
+ Chain(boolean isMap) {
+ this.isMap = isMap;
+ }
+
+ /**
+ * Returns the prefix to use for the configuration of the chain depending
+ * if it is for a Mapper or a Reducer.
+ *
+ * @param isMap TRUE for Mapper, FALSE for Reducer.
+ * @return the prefix to use.
+ */
+ private static String getPrefix(boolean isMap) {
+ return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER;
+ }
+
+ /**
+ * Creates a {@link JobConf} for one of the Maps or Reduce in the chain.
+ * <p/>
+ * It creates a new JobConf using the chain job's JobConf as base and adds to
+ * it the configuration properties for the chain element. The keys of the
+ * chain element jobConf have precedence over the given JobConf.
+ *
+ * @param jobConf the chain job's JobConf.
+ * @param confKey the key for chain element configuration serialized in the
+ * chain job's JobConf.
+ * @return a new JobConf aggregating the chain job's JobConf with the chain
+ * element configuration properties.
+ */
+ private static JobConf getChainElementConf(JobConf jobConf, String confKey) {
+ JobConf conf;
+ try {
+ Stringifier<WritableJobConf> stringifier =
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+ conf = stringifier.fromString(jobConf.get(confKey, null));
+ } catch (IOException ioex) {
+ throw new RuntimeException(ioex);
+ }
+ // we have to do this because the Writable desearialization clears all
+ // values set in the conf making not possible do do a new JobConf(jobConf)
+ // in the creation of the conf above
+ jobConf = new JobConf(jobConf);
+
+ for(Map.Entry<String, String> entry : conf) {
+ jobConf.set(entry.getKey(), entry.getValue());
+ }
+ return jobConf;
+ }
+
+ /**
+ * Adds a Mapper class to the chain job's JobConf.
+ * <p/>
+ * The configuration properties of the chain job have precedence over the
+ * configuration properties of the Mapper.
+ *
+ * @param isMap indicates if the Chain is for a Mapper or for a
+ * Reducer.
+ * @param jobConf chain job's JobConf to add the Mapper class.
+ * @param klass the Mapper class to add.
+ * @param inputKeyClass mapper input key class.
+ * @param inputValueClass mapper input value class.
+ * @param outputKeyClass mapper output key class.
+ * @param outputValueClass mapper output value class.
+ * @param byValue indicates if key/values should be passed by value
+ * to the next Mapper in the chain, if any.
+ * @param mapperConf a JobConf with the configuration for the Mapper
+ * class. It is recommended to use a JobConf without default values using the
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+ */
+ public static <K1, V1, K2, V2> void addMapper(boolean isMap, JobConf jobConf,
+ Class<? extends Mapper<K1, V1, K2, V2>> klass,
+ Class<? extends K1> inputKeyClass,
+ Class<? extends V1> inputValueClass,
+ Class<? extends K2> outputKeyClass,
+ Class<? extends V2> outputValueClass,
+ boolean byValue, JobConf mapperConf) {
+ String prefix = getPrefix(isMap);
+
+ // if a reducer chain check the Reducer has been already set
+ if (!isMap) {
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS,
+ Reducer.class) == null) {
+ throw new IllegalStateException(
+ "A Mapper can be added to the chain only after the Reducer has " +
+ "been set");
+ }
+ }
+ int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+ jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class);
+
+ // if it is a reducer chain and the first Mapper is being added check the
+ // key and value input classes of the mapper match those of the reducer
+ // output.
+ if (!isMap && index == 0) {
+ JobConf reducerConf =
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+ if (! inputKeyClass.isAssignableFrom(
+ reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) {
+ throw new IllegalArgumentException("The Reducer output key class does" +
+ " not match the Mapper input key class");
+ }
+ if (! inputValueClass.isAssignableFrom(
+ reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) {
+ throw new IllegalArgumentException("The Reducer output value class" +
+ " does not match the Mapper input value class");
+ }
+ } else if (index > 0) {
+ // check the that the new Mapper in the chain key and value input classes
+ // match those of the previous Mapper output.
+ JobConf previousMapperConf =
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG +
+ (index - 1));
+ if (! inputKeyClass.isAssignableFrom(
+ previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) {
+ throw new IllegalArgumentException("The Mapper output key class does" +
+ " not match the previous Mapper input key class");
+ }
+ if (! inputValueClass.isAssignableFrom(
+ previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) {
+ throw new IllegalArgumentException("The Mapper output value class" +
+ " does not match the previous Mapper input value class");
+ }
+ }
+
+ // if the Mapper does not have a private JobConf create an empty one
+ if (mapperConf == null) {
+ // using a JobConf without defaults to make it lightweight.
+ // still the chain JobConf may have all defaults and this conf is
+ // overlapped to the chain JobConf one.
+ mapperConf = new JobConf(true);
+ }
+
+ // store in the private mapper conf the input/output classes of the mapper
+ // and if it works by value or by reference
+ mapperConf.setBoolean(MAPPER_BY_VALUE, byValue);
+ mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+ mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass,
+ Object.class);
+ mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class);
+ mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass,
+ Object.class);
+
+ // serialize the private mapper jobconf in the chain jobconf.
+ Stringifier<WritableJobConf> stringifier =
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+ try {
+ jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index,
+ stringifier.toString(new WritableJobConf(mapperConf)));
+ }
+ catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+
+ // increment the chain counter
+ jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1);
+ }
+
+ /**
+ * Sets the Reducer class to the chain job's JobConf.
+ * <p/>
+ * The configuration properties of the chain job have precedence over the
+ * configuration properties of the Reducer.
+ *
+ * @param jobConf chain job's JobConf to add the Reducer class.
+ * @param klass the Reducer class to add.
+ * @param inputKeyClass reducer input key class.
+ * @param inputValueClass reducer input value class.
+ * @param outputKeyClass reducer output key class.
+ * @param outputValueClass reducer output value class.
+ * @param byValue indicates if key/values should be passed by value
+ * to the next Mapper in the chain, if any.
+ * @param reducerConf a JobConf with the configuration for the Reducer
+ * class. It is recommended to use a JobConf without default values using the
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+ */
+ public static <K1, V1, K2, V2> void setReducer(JobConf jobConf,
+ Class<? extends Reducer<K1, V1, K2, V2>> klass,
+ Class<? extends K1> inputKeyClass,
+ Class<? extends V1> inputValueClass,
+ Class<? extends K2> outputKeyClass,
+ Class<? extends V2> outputValueClass,
+ boolean byValue, JobConf reducerConf) {
+ String prefix = getPrefix(false);
+
+ if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) {
+ throw new IllegalStateException("Reducer has been already set");
+ }
+
+ jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class);
+
+ // if the Reducer does not have a private JobConf create an empty one
+ if (reducerConf == null) {
+ // using a JobConf without defaults to make it lightweight.
+ // still the chain JobConf may have all defaults and this conf is
+ // overlapped to the chain JobConf one.
+ reducerConf = new JobConf(false);
+ }
+
+ // store in the private reducer conf the input/output classes of the reducer
+ // and if it works by value or by reference
+ reducerConf.setBoolean(MAPPER_BY_VALUE, byValue);
+ reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class);
+ reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass,
+ Object.class);
+ reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass,
+ Object.class);
+ reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass,
+ Object.class);
+
+ // serialize the private mapper jobconf in the chain jobconf.
+ Stringifier<WritableJobConf> stringifier =
+ new DefaultStringifier<WritableJobConf>(jobConf, WritableJobConf.class);
+ try {
+ jobConf.set(prefix + CHAIN_REDUCER_CONFIG,
+ stringifier.toString(new WritableJobConf(reducerConf)));
+ }
+ catch (IOException ioEx) {
+ throw new RuntimeException(ioEx);
+ }
+ }
+
+ /**
+ * Configures all the chain elements for the task.
+ *
+ * @param jobConf chain job's JobConf.
+ */
+ public void configure(JobConf jobConf) {
+ String prefix = getPrefix(isMap);
+ chainJobConf = jobConf;
+ SerializationFactory serializationFactory =
+ new SerializationFactory(chainJobConf);
+ int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0);
+ for (int i = 0; i < index; i++) {
+ Class<? extends Mapper> klass =
+ jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class);
+ JobConf mConf =
+ getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i);
+ Mapper mapper = ReflectionUtils.newInstance(klass, mConf);
+ mappers.add(mapper);
+
+ if (mConf.getBoolean(MAPPER_BY_VALUE, true)) {
+ mappersKeySerialization.add(serializationFactory.getSerialization(
+ mConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null)));
+ mappersValueSerialization.add(serializationFactory.getSerialization(
+ mConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null)));
+ } else {
+ mappersKeySerialization.add(null);
+ mappersValueSerialization.add(null);
+ }
+ }
+ Class<? extends Reducer> klass =
+ jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class);
+ if (klass != null) {
+ JobConf rConf =
+ getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG);
+ reducer = ReflectionUtils.newInstance(klass, rConf);
+ if (rConf.getBoolean(REDUCER_BY_VALUE, true)) {
+ reducerKeySerialization = serializationFactory
+ .getSerialization(rConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null));
+ reducerValueSerialization = serializationFactory
+ .getSerialization(rConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null));
+ } else {
+ reducerKeySerialization = null;
+ reducerValueSerialization = null;
+ }
+ }
+ }
+
+ /**
+ * Returns the chain job conf.
+ *
+ * @return the chain job conf.
+ */
+ protected JobConf getChainJobConf() {
+ return chainJobConf;
+ }
+
+ /**
+ * Returns the first Mapper instance in the chain.
+ *
+ * @return the first Mapper instance in the chain or NULL if none.
+ */
+ public Mapper getFirstMap() {
+ return (mappers.size() > 0) ? mappers.get(0) : null;
+ }
+
+ /**
+ * Returns the Reducer instance in the chain.
+ *
+ * @return the Reducer instance in the chain or NULL if none.
+ */
+ public Reducer getReducer() {
+ return reducer;
+ }
+
+ /**
+ * Returns the OutputCollector to be used by a Mapper instance in the chain.
+ *
+ * @param mapperIndex index of the Mapper instance to get the OutputCollector.
+ * @param output the original OutputCollector of the task.
+ * @param reporter the reporter of the task.
+ * @return the OutputCollector to be used in the chain.
+ */
+ @SuppressWarnings({"unchecked"})
+ public OutputCollector getMapperCollector(int mapperIndex,
+ OutputCollector output,
+ Reporter reporter) {
+ Serialization keySerialization = mappersKeySerialization.get(mapperIndex);
+ Serialization valueSerialization =
+ mappersValueSerialization.get(mapperIndex);
+ return new ChainOutputCollector(mapperIndex, keySerialization,
+ valueSerialization, output, reporter);
+ }
+
+ /**
+ * Returns the OutputCollector to be used by a Mapper instance in the chain.
+ *
+ * @param output the original OutputCollector of the task.
+ * @param reporter the reporter of the task.
+ * @return the OutputCollector to be used in the chain.
+ */
+ @SuppressWarnings({"unchecked"})
+ public OutputCollector getReducerCollector(OutputCollector output,
+ Reporter reporter) {
+ return new ChainOutputCollector(reducerKeySerialization,
+ reducerValueSerialization, output,
+ reporter);
+ }
+
+ /**
+ * Closes all the chain elements.
+ *
+ * @throws IOException thrown if any of the chain elements threw an
+ * IOException exception.
+ */
+ public void close() throws IOException {
+ for (Mapper map : mappers) {
+ map.close();
+ }
+ if (reducer != null) {
+ reducer.close();
+ }
+ }
+
+ // using a ThreadLocal to reuse the ByteArrayOutputStream used for ser/deser
+ // it has to be a thread local because if not it would break if used from a
+ // MultiThreadedMapRunner.
+ private ThreadLocal<DataOutputBuffer> threadLocalDataOutputBuffer =
+ new ThreadLocal<DataOutputBuffer>() {
+ protected DataOutputBuffer initialValue() {
+ return new DataOutputBuffer(1024);
+ }
+ };
+
+ /**
+ * OutputCollector implementation used by the chain tasks.
+ * <p/>
+ * If it is not the end of the chain, a {@link #collect} invocation invokes
+ * the next Mapper in the chain. If it is the end of the chain the task
+ * OutputCollector is called.
+ */
+ private class ChainOutputCollector<K, V> implements OutputCollector<K, V> {
+ private int nextMapperIndex;
+ private Serialization<K> keySerialization;
+ private Serialization<V> valueSerialization;
+ private OutputCollector output;
+ private Reporter reporter;
+
+ /*
+ * Constructor for Mappers
+ */
+ public ChainOutputCollector(int index, Serialization<K> keySerialization,
+ Serialization<V> valueSerialization,
+ OutputCollector output, Reporter reporter) {
+ this.nextMapperIndex = index + 1;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
+ this.output = output;
+ this.reporter = reporter;
+ }
+
+ /*
+ * Constructor for Reducer
+ */
+ public ChainOutputCollector(Serialization<K> keySerialization,
+ Serialization<V> valueSerialization,
+ OutputCollector output, Reporter reporter) {
+ this.nextMapperIndex = 0;
+ this.keySerialization = keySerialization;
+ this.valueSerialization = valueSerialization;
+ this.output = output;
+ this.reporter = reporter;
+ }
+
+ @SuppressWarnings({"unchecked"})
+ public void collect(K key, V value) throws IOException {
+ if (nextMapperIndex < mappers.size()) {
+ // there is a next mapper in chain
+
+ // only need to ser/deser if there is next mapper in the chain
+ if (keySerialization != null) {
+ key = makeCopyForPassByValue(keySerialization, key);
+ value = makeCopyForPassByValue(valueSerialization, value);
+ }
+
+ // gets ser/deser and mapper of next in chain
+ Serialization nextKeySerialization =
+ mappersKeySerialization.get(nextMapperIndex);
+ Serialization nextValueSerialization =
+ mappersValueSerialization.get(nextMapperIndex);
+ Mapper nextMapper = mappers.get(nextMapperIndex);
+
+ // invokes next mapper in chain
+ nextMapper.map(key, value,
+ new ChainOutputCollector(nextMapperIndex,
+ nextKeySerialization,
+ nextValueSerialization,
+ output, reporter),
+ reporter);
+ } else {
+ // end of chain, user real output collector
+ output.collect(key, value);
+ }
+ }
+
+ private <E> E makeCopyForPassByValue(Serialization<E> serialization,
+ E obj) throws IOException {
+ Serializer<E> ser =
+ serialization.getSerializer(GenericsUtil.getClass(obj));
+ Deserializer<E> deser =
+ serialization.getDeserializer(GenericsUtil.getClass(obj));
+
+ DataOutputBuffer dof = threadLocalDataOutputBuffer.get();
+
+ dof.reset();
+ ser.open(dof);
+ ser.serialize(obj);
+ ser.close();
+ obj = ReflectionUtils.newInstance(GenericsUtil.getClass(obj),
+ getChainJobConf());
+ ByteArrayInputStream bais =
+ new ByteArrayInputStream(dof.getData(), 0, dof.getLength());
+ deser.open(bais);
+ deser.deserialize(obj);
+ deser.close();
+ return obj;
+ }
+
+ }
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainMapper.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+
+/**
+ * The ChainMapper class allows to use multiple Mapper classes within a single
+ * Map task.
+ * <p/>
+ * The Mapper classes are invoked in a chained (or piped) fashion, the output of
+ * the first becomes the input of the second, and so on until the last Mapper,
+ * the output of the last Mapper will be written to the task's output.
+ * <p/>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed in a chain. This enables having
+ * reusable specialized Mappers that can be combined to perform composite
+ * operations within a single task.
+ * <p/>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use maching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * <p/>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain.
+ * <p/>
+ * ChainMapper usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * conf.setJobName("chain");
+ * conf.setInputFormat(TextInputFormat.class);
+ * conf.setOutputFormat(TextOutputFormat.class);
+ * <p/>
+ * JobConf mapAConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+ * Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * JobConf mapBConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
+ * LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * JobConf reduceConf = new JobConf(false);
+ * ...
+ * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
+ * Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
+ * LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+ * LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * FileInputFormat.setInputPaths(conf, inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ * ...
+ * <p/>
+ * JobClient jc = new JobClient(conf);
+ * RunningJob job = jc.submitJob(conf);
+ * ...
+ * </pre>
+ */
+public class ChainMapper implements Mapper {
+
+ /**
+ * Adds a Mapper class to the chain job's JobConf.
+ * <p/>
+ * It has to be specified how key and values are passed from one element of
+ * the chain to the next, by value or by reference. If a Mapper leverages the
+ * assumed semantics that the key and values are not modified by the collector
+ * 'by value' must be used. If the Mapper does not expect this semantics, as
+ * an optimization to avoid serialization and deserialization 'by reference'
+ * can be used.
+ * <p/>
+ * For the added Mapper the configuration given for it,
+ * <code>mapperConf</code>, have precedence over the job's JobConf. This
+ * precedence is in effect when the task is running.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain
+ * <p/>
+ *
+ * @param job job's JobConf to add the Mapper class.
+ * @param klass the Mapper class to add.
+ * @param inputKeyClass mapper input key class.
+ * @param inputValueClass mapper input value class.
+ * @param outputKeyClass mapper output key class.
+ * @param outputValueClass mapper output value class.
+ * @param byValue indicates if key/values should be passed by value
+ * to the next Mapper in the chain, if any.
+ * @param mapperConf a JobConf with the configuration for the Mapper
+ * class. It is recommended to use a JobConf without default values using the
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+ */
+ public static <K1, V1, K2, V2> void addMapper(JobConf job,
+ Class<? extends Mapper<K1, V1, K2, V2>> klass,
+ Class<? extends K1> inputKeyClass,
+ Class<? extends V1> inputValueClass,
+ Class<? extends K2> outputKeyClass,
+ Class<? extends V2> outputValueClass,
+ boolean byValue, JobConf mapperConf) {
+ job.setMapperClass(ChainMapper.class);
+ job.setMapOutputKeyClass(outputKeyClass);
+ job.setMapOutputValueClass(outputValueClass);
+ Chain.addMapper(true, job, klass, inputKeyClass, inputValueClass,
+ outputKeyClass, outputValueClass, byValue, mapperConf);
+ }
+
+ private Chain chain;
+
+ /**
+ * Constructor.
+ */
+ public ChainMapper() {
+ chain = new Chain(true);
+ }
+
+ /**
+ * Configures the ChainMapper and all the Mappers in the chain.
+ * <p/>
+ * If this method is overriden <code>super.configure(...)</code> should be
+ * invoked at the beginning of the overwriter method.
+ */
+ public void configure(JobConf job) {
+ chain.configure(job);
+ }
+
+ /**
+ * Chains the <code>map(...)</code> methods of the Mappers in the chain.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void map(Object key, Object value, OutputCollector output,
+ Reporter reporter) throws IOException {
+ Mapper mapper = chain.getFirstMap();
+ if (mapper != null) {
+ mapper.map(key, value, chain.getMapperCollector(0, output, reporter),
+ reporter);
+ }
+ }
+
+ /**
+ * Closes the ChainMapper and all the Mappers in the chain.
+ * <p/>
+ * If this method is overriden <code>super.close()</code> should be
+ * invoked at the end of the overwriter method.
+ */
+ public void close() throws IOException {
+ chain.close();
+ }
+
+}
Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/lib/ChainReducer.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.mapred.*;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * The ChainReducer class allows to chain multiple Mapper classes after a
+ * Reducer within the Reducer task.
+ * <p/>
+ * For each record output by the Reducer, the Mapper classes are invoked in a
+ * chained (or piped) fashion, the output of the first becomes the input of the
+ * second, and so on until the last Mapper, the output of the last Mapper will
+ * be written to the task's output.
+ * <p/>
+ * The key functionality of this feature is that the Mappers in the chain do not
+ * need to be aware that they are executed after the Reducer or in a chain.
+ * This enables having reusable specialized Mappers that can be combined to
+ * perform composite operations within a single task.
+ * <p/>
+ * Special care has to be taken when creating chains that the key/values output
+ * by a Mapper are valid for the following Mapper in the chain. It is assumed
+ * all Mappers and the Reduce in the chain use maching output and input key and
+ * value classes as no conversion is done by the chaining code.
+ * <p/>
+ * Using the ChainMapper and the ChainReducer classes is possible to compose
+ * Map/Reduce jobs that look like <code>[MAP+ / REDUCE MAP*]</code>. And
+ * immediate benefit of this pattern is a dramatic reduction in disk IO.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainReducer, this is done by the setReducer or the addMapper for the last
+ * element in the chain.
+ * <p/>
+ * ChainReducer usage pattern:
+ * <p/>
+ * <pre>
+ * ...
+ * conf.setJobName("chain");
+ * conf.setInputFormat(TextInputFormat.class);
+ * conf.setOutputFormat(TextOutputFormat.class);
+ * <p/>
+ * JobConf mapAConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+ * Text.class, Text.class, true, mapAConf);
+ * <p/>
+ * JobConf mapBConf = new JobConf(false);
+ * ...
+ * ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class,
+ * LongWritable.class, Text.class, false, mapBConf);
+ * <p/>
+ * JobConf reduceConf = new JobConf(false);
+ * ...
+ * ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class,
+ * Text.class, Text.class, true, reduceConf);
+ * <p/>
+ * ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class,
+ * LongWritable.class, Text.class, false, null);
+ * <p/>
+ * ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+ * LongWritable.class, LongWritable.class, true, null);
+ * <p/>
+ * FileInputFormat.setInputPaths(conf, inDir);
+ * FileOutputFormat.setOutputPath(conf, outDir);
+ * ...
+ * <p/>
+ * JobClient jc = new JobClient(conf);
+ * RunningJob job = jc.submitJob(conf);
+ * ...
+ * </pre>
+ */
+public class ChainReducer implements Reducer {
+
+ /**
+ * Sets the Reducer class to the chain job's JobConf.
+ * <p/>
+ * It has to be specified how key and values are passed from one element of
+ * the chain to the next, by value or by reference. If a Reducer leverages the
+ * assumed semantics that the key and values are not modified by the collector
+ * 'by value' must be used. If the Reducer does not expect this semantics, as
+ * an optimization to avoid serialization and deserialization 'by reference'
+ * can be used.
+ * <p/>
+ * For the added Reducer the configuration given for it,
+ * <code>reducerConf</code>, have precedence over the job's JobConf. This
+ * precedence is in effect when the task is running.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainReducer, this is done by the setReducer or the addMapper for the last
+ * element in the chain.
+ *
+ * @param job job's JobConf to add the Reducer class.
+ * @param klass the Reducer class to add.
+ * @param inputKeyClass reducer input key class.
+ * @param inputValueClass reducer input value class.
+ * @param outputKeyClass reducer output key class.
+ * @param outputValueClass reducer output value class.
+ * @param byValue indicates if key/values should be passed by value
+ * to the next Mapper in the chain, if any.
+ * @param reducerConf a JobConf with the configuration for the Reducer
+ * class. It is recommended to use a JobConf without default values using the
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+ */
+ public static <K1, V1, K2, V2> void setReducer(JobConf job,
+ Class<? extends Reducer<K1, V1, K2, V2>> klass,
+ Class<? extends K1> inputKeyClass,
+ Class<? extends V1> inputValueClass,
+ Class<? extends K2> outputKeyClass,
+ Class<? extends V2> outputValueClass,
+ boolean byValue, JobConf reducerConf) {
+ job.setReducerClass(ChainReducer.class);
+ job.setOutputKeyClass(outputKeyClass);
+ job.setOutputValueClass(outputValueClass);
+ Chain.setReducer(job, klass, inputKeyClass, inputValueClass, outputKeyClass,
+ outputValueClass, byValue, reducerConf);
+ }
+
+ /**
+ * Adds a Mapper class to the chain job's JobConf.
+ * <p/>
+ * It has to be specified how key and values are passed from one element of
+ * the chain to the next, by value or by reference. If a Mapper leverages the
+ * assumed semantics that the key and values are not modified by the collector
+ * 'by value' must be used. If the Mapper does not expect this semantics, as
+ * an optimization to avoid serialization and deserialization 'by reference'
+ * can be used.
+ * <p/>
+ * For the added Mapper the configuration given for it,
+ * <code>mapperConf</code>, have precedence over the job's JobConf. This
+ * precedence is in effect when the task is running.
+ * <p/>
+ * IMPORTANT: There is no need to specify the output key/value classes for the
+ * ChainMapper, this is done by the addMapper for the last mapper in the chain
+ * .
+ *
+ * @param job chain job's JobConf to add the Mapper class.
+ * @param klass the Mapper class to add.
+ * @param inputKeyClass mapper input key class.
+ * @param inputValueClass mapper input value class.
+ * @param outputKeyClass mapper output key class.
+ * @param outputValueClass mapper output value class.
+ * @param byValue indicates if key/values should be passed by value
+ * to the next Mapper in the chain, if any.
+ * @param mapperConf a JobConf with the configuration for the Mapper
+ * class. It is recommended to use a JobConf without default values using the
+ * <code>JobConf(boolean loadDefaults)</code> constructor with FALSE.
+ */
+ public static <K1, V1, K2, V2> void addMapper(JobConf job,
+ Class<? extends Mapper<K1, V1, K2, V2>> klass,
+ Class<? extends K1> inputKeyClass,
+ Class<? extends V1> inputValueClass,
+ Class<? extends K2> outputKeyClass,
+ Class<? extends V2> outputValueClass,
+ boolean byValue, JobConf mapperConf) {
+ job.setOutputKeyClass(outputKeyClass);
+ job.setOutputValueClass(outputValueClass);
+ Chain.addMapper(false, job, klass, inputKeyClass, inputValueClass,
+ outputKeyClass, outputValueClass, byValue, mapperConf);
+ }
+
+ private Chain chain;
+
+ /**
+ * Constructor.
+ */
+ public ChainReducer() {
+ chain = new Chain(false);
+ }
+
+ /**
+ * Configures the ChainReducer, the Reducer and all the Mappers in the chain.
+ * <p/>
+ * If this method is overriden <code>super.configure(...)</code> should be
+ * invoked at the beginning of the overwriter method.
+ */
+ public void configure(JobConf job) {
+ chain.configure(job);
+ }
+
+ /**
+ * Chains the <code>reduce(...)</code> method of the Reducer with the
+ * <code>map(...) </code> methods of the Mappers in the chain.
+ */
+ @SuppressWarnings({"unchecked"})
+ public void reduce(Object key, Iterator values, OutputCollector output,
+ Reporter reporter) throws IOException {
+ Reducer reducer = chain.getReducer();
+ if (reducer != null) {
+ reducer.reduce(key, values, chain.getReducerCollector(output, reporter),
+ reporter);
+ }
+ }
+
+ /**
+ * Closes the ChainReducer, the Reducer and all the Mappers in the chain.
+ * <p/>
+ * If this method is overriden <code>super.close()</code> should be
+ * invoked at the end of the overwriter method.
+ */
+ public void close() throws IOException {
+ chain.close();
+ }
+
+}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java?rev=693460&r1=693459&r2=693460&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java Tue Sep 9 06:18:43 2008
@@ -21,6 +21,10 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Random;
@@ -333,7 +337,23 @@
assertEquals(null, conf.get("test.key2"));
assertEquals("value5", conf.get("test.key4"));
}
-
+
+ public void testSize() throws IOException {
+ Configuration conf = new Configuration(false);
+ conf.set("a", "A");
+ conf.set("b", "B");
+ assertEquals(2, conf.size());
+ }
+
+ public void testClear() throws IOException {
+ Configuration conf = new Configuration(false);
+ conf.set("a", "A");
+ conf.set("b", "B");
+ conf.clear();
+ assertEquals(0, conf.size());
+ assertFalse(conf.iterator().hasNext());
+ }
+
public static void main(String[] argv) throws Exception {
junit.textui.TestRunner.main(new String[]{
TestConfiguration.class.getName()
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestWritableJobConf.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.util.GenericsUtil;
+
+public class TestWritableJobConf extends TestCase {
+
+ private static final Configuration CONF = new Configuration();
+
+ private <K> K serDeser(K conf) throws Exception {
+ SerializationFactory factory = new SerializationFactory(CONF);
+ Serializer<K> serializer =
+ factory.getSerializer(GenericsUtil.getClass(conf));
+ Deserializer<K> deserializer =
+ factory.getDeserializer(GenericsUtil.getClass(conf));
+
+ DataOutputBuffer out = new DataOutputBuffer();
+ serializer.open(out);
+ serializer.serialize(conf);
+ serializer.close();
+
+ DataInputBuffer in = new DataInputBuffer();
+ in.reset(out.getData(), out.getLength());
+ deserializer.open(in);
+ K after = deserializer.deserialize(null);
+ deserializer.close();
+ return after;
+ }
+
+ private void assertEquals(Configuration conf1, Configuration conf2) {
+ assertEquals(conf1.size(), conf2.size());
+
+ Iterator<Map.Entry<String, String>> iterator1 = conf1.iterator();
+ Map<String, String> map1 = new HashMap<String,String>();
+ while (iterator1.hasNext()) {
+ Map.Entry<String, String> entry = iterator1.next();
+ map1.put(entry.getKey(), entry.getValue());
+ }
+
+ Iterator<Map.Entry<String, String>> iterator2 = conf1.iterator();
+ Map<String, String> map2 = new HashMap<String,String>();
+ while (iterator2.hasNext()) {
+ Map.Entry<String, String> entry = iterator2.next();
+ map2.put(entry.getKey(), entry.getValue());
+ }
+
+ assertEquals(map1, map2);
+ }
+
+ public void testEmptyConfiguration() throws Exception {
+ WritableJobConf conf = new WritableJobConf();
+ Configuration deser = serDeser(conf);
+ assertEquals(conf, deser);
+ }
+
+ public void testNonEmptyConfiguration() throws Exception {
+ WritableJobConf conf = new WritableJobConf();
+ conf.set("a", "A");
+ conf.set("b", "B");
+ Configuration deser = serDeser(conf);
+ assertEquals(conf, deser);
+ }
+
+ public void testConfigurationWithDefaults() throws Exception {
+ WritableJobConf conf = new WritableJobConf(false);
+ conf.set("a", "A");
+ conf.set("b", "B");
+ Configuration deser = serDeser(conf);
+ assertEquals(conf, deser);
+ }
+
+}
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java?rev=693460&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestChainMapReduce.java Tue Sep 9 06:18:43 2008
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred.lib;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.*;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+public class TestChainMapReduce extends HadoopTestCase {
+
+ private static Path getFlagDir(boolean local) {
+ Path flagDir = new Path("testing/chain/flags");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (local) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ flagDir = new Path(localPathRoot, flagDir);
+ }
+ return flagDir;
+ }
+
+ private static void cleanFlags(JobConf conf) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ fs.delete(getFlagDir(conf.getBoolean("localFS", true)), true);
+ fs.mkdirs(getFlagDir(conf.getBoolean("localFS", true)));
+ }
+
+ private static void writeFlag(JobConf conf, String flag) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (getFlag(conf, flag)) {
+ fail("Flag " + flag + " already exists");
+ }
+ DataOutputStream file =
+ fs.create(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
+ file.close();
+ }
+
+ private static boolean getFlag(JobConf conf, String flag) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ return fs
+ .exists(new Path(getFlagDir(conf.getBoolean("localFS", true)), flag));
+ }
+
+ public TestChainMapReduce() throws IOException {
+ super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
+ }
+
+ public void testChain() throws Exception {
+ Path inDir = new Path("testing/chain/input");
+ Path outDir = new Path("testing/chain/output");
+
+ // Hack for local FS that does not have the concept of a 'mounting point'
+ if (isLocalFS()) {
+ String localPathRoot = System.getProperty("test.build.data", "/tmp")
+ .replace(' ', '+');
+ inDir = new Path(localPathRoot, inDir);
+ outDir = new Path(localPathRoot, outDir);
+ }
+
+
+ JobConf conf = createJobConf();
+ conf.setBoolean("localFS", isLocalFS());
+
+ cleanFlags(conf);
+
+ FileSystem fs = FileSystem.get(conf);
+
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+
+ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
+ file.writeBytes("1\n2\n");
+ file.close();
+
+ conf.setJobName("chain");
+ conf.setInputFormat(TextInputFormat.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+
+ conf.set("a", "X");
+
+ JobConf mapAConf = new JobConf(false);
+ mapAConf.set("a", "A");
+ ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class,
+ LongWritable.class, Text.class, true, mapAConf);
+
+ ChainMapper.addMapper(conf, BMap.class, LongWritable.class, Text.class,
+ LongWritable.class, Text.class, false, null);
+
+ JobConf reduceConf = new JobConf(false);
+ reduceConf.set("a", "C");
+ ChainReducer.setReducer(conf, CReduce.class, LongWritable.class, Text.class,
+ LongWritable.class, Text.class, true, reduceConf);
+
+ ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class,
+ LongWritable.class, Text.class, false, null);
+
+ JobConf mapEConf = new JobConf(false);
+ mapEConf.set("a", "E");
+ ChainReducer.addMapper(conf, EMap.class, LongWritable.class, Text.class,
+ LongWritable.class, Text.class, true, mapEConf);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+
+ JobClient jc = new JobClient(conf);
+ RunningJob job = jc.submitJob(conf);
+ while (!job.isComplete()) {
+ Thread.sleep(100);
+ }
+
+ assertTrue(getFlag(conf, "configure.A"));
+ assertTrue(getFlag(conf, "configure.B"));
+ assertTrue(getFlag(conf, "configure.C"));
+ assertTrue(getFlag(conf, "configure.D"));
+ assertTrue(getFlag(conf, "configure.E"));
+
+ assertTrue(getFlag(conf, "map.A.value.1"));
+ assertTrue(getFlag(conf, "map.A.value.2"));
+ assertTrue(getFlag(conf, "map.B.value.1"));
+ assertTrue(getFlag(conf, "map.B.value.2"));
+ assertTrue(getFlag(conf, "reduce.C.value.2"));
+ assertTrue(getFlag(conf, "reduce.C.value.1"));
+ assertTrue(getFlag(conf, "map.D.value.1"));
+ assertTrue(getFlag(conf, "map.D.value.2"));
+ assertTrue(getFlag(conf, "map.E.value.1"));
+ assertTrue(getFlag(conf, "map.E.value.2"));
+
+ assertTrue(getFlag(conf, "close.A"));
+ assertTrue(getFlag(conf, "close.B"));
+ assertTrue(getFlag(conf, "close.C"));
+ assertTrue(getFlag(conf, "close.D"));
+ assertTrue(getFlag(conf, "close.E"));
+ }
+
+ public static class AMap extends IDMap {
+ public AMap() {
+ super("A", "A", true);
+ }
+ }
+
+ public static class BMap extends IDMap {
+ public BMap() {
+ super("B", "X", false);
+ }
+ }
+
+ public static class CReduce extends IDReduce {
+ public CReduce() {
+ super("C", "C");
+ }
+ }
+
+ public static class DMap extends IDMap {
+ public DMap() {
+ super("D", "X", false);
+ }
+ }
+
+ public static class EMap extends IDMap {
+ public EMap() {
+ super("E", "E", true);
+ }
+ }
+
+ public static class IDMap
+ implements Mapper<LongWritable, Text, LongWritable, Text> {
+ private JobConf conf;
+ private String name;
+ private String prop;
+ private boolean byValue;
+
+ public IDMap(String name, String prop, boolean byValue) {
+ this.name = name;
+ this.prop = prop;
+ this.byValue = byValue;
+ }
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ assertEquals(prop, conf.get("a"));
+ try {
+ writeFlag(conf, "configure." + name);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter) throws IOException {
+ writeFlag(conf, "map." + name + ".value." + value);
+ key.set(10);
+ output.collect(key, value);
+ if (byValue) {
+ assertEquals(10, key.get());
+ } else {
+ assertNotSame(10, key.get());
+ }
+ key.set(11);
+ }
+
+ public void close() throws IOException {
+ try {
+ writeFlag(conf, "close." + name);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+ public static class IDReduce
+ implements Reducer<LongWritable, Text, LongWritable, Text> {
+
+ private JobConf conf;
+ private String name;
+ private String prop;
+ private boolean byValue = false;
+
+ public IDReduce(String name, String prop) {
+ this.name = name;
+ this.prop = prop;
+ }
+
+ public void configure(JobConf conf) {
+ this.conf = conf;
+ assertEquals(prop, conf.get("a"));
+ try {
+ writeFlag(conf, "configure." + name);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ public void reduce(LongWritable key, Iterator<Text> values,
+ OutputCollector<LongWritable, Text> output,
+ Reporter reporter) throws IOException {
+ while (values.hasNext()) {
+ Text value = values.next();
+ writeFlag(conf, "reduce." + name + ".value." + value);
+ key.set(10);
+ output.collect(key, value);
+ if (byValue) {
+ assertEquals(10, key.get());
+ } else {
+ assertNotSame(10, key.get());
+ }
+ key.set(11);
+ }
+ }
+
+ public void close() throws IOException {
+ try {
+ writeFlag(conf, "close." + name);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+
+}