You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/02/04 19:06:02 UTC

git commit: MRQL-27: Improve the build process and support hadoop 0.20.x

Updated Branches:
  refs/heads/master 55fea5b45 -> 9b76440e0


MRQL-27: Improve the build process and support hadoop 0.20.x


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/9b76440e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/9b76440e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/9b76440e

Branch: refs/heads/master
Commit: 9b76440e0c56ef9f585e6c05c56551d643420f43
Parents: 55fea5b
Author: fegaras <fe...@cse.uta.edu>
Authored: Tue Feb 4 12:05:13 2014 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Tue Feb 4 12:05:13 2014 -0600

----------------------------------------------------------------------
 MapReduce/pom.xml                               |  33 ++++
 bin/mrql                                        |  21 ++-
 bin/mrql.bsp                                    |  25 ++-
 conf/mrql-env.sh                                |  30 ++--
 pom.xml                                         |  14 +-
 .../MultipleInputs/DelegatingInputFormat.java   | 127 +++++++++++++++
 .../java/MultipleInputs/DelegatingMapper.java   |  54 +++++++
 .../MultipleInputs/DelegatingRecordReader.java  |  88 ++++++++++
 .../java/MultipleInputs/MultipleInputs.java     | 141 ++++++++++++++++
 .../java/MultipleInputs/TaggedInputSplit.java   | 159 +++++++++++++++++++
 10 files changed, 655 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/MapReduce/pom.xml
----------------------------------------------------------------------
diff --git a/MapReduce/pom.xml b/MapReduce/pom.xml
index 8dceee0..ab36e49 100644
--- a/MapReduce/pom.xml
+++ b/MapReduce/pom.xml
@@ -51,6 +51,39 @@
     </dependency>
   </dependencies>
 
+  <profiles>
+    <profile>
+      <id>MultipleInputs</id>
+      <activation>
+        <property>
+          <name>MultipleInputs</name>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>1.8</version>
+            <executions>
+              <execution>
+                <phase>generate-sources</phase>
+                <goals><goal>add-source</goal></goals>
+                <configuration>
+                  <sources>
+                    <source>../src/main/java/MultipleInputs</source>
+                    <source>../src/main/java/MapReduce</source>
+                    <source>${project.build.directory}/generated-sources/org/apache/mrql</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <build>
     <plugins>
       <plugin>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/bin/mrql
----------------------------------------------------------------------
diff --git a/bin/mrql b/bin/mrql
index 7ea280f..11b8b89 100755
--- a/bin/mrql
+++ b/bin/mrql
@@ -31,15 +31,26 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 MR_JAR=`ls "$MRQL_HOME"/lib/mrql-mr-*.jar`
+FULL_JAR="$MRQL_HOME/lib/mrql-mr-all.jar"
 
 export JAVA_HOME MAPRED_JOB_TRACKER FS_DEFAULT_NAME
 
+if (! [ -a $FULL_JAR ]); then
+   rm -rf "$MRQL_HOME/tmp/classes"
+   mkdir -p "$MRQL_HOME/tmp/classes"
+   pushd $MRQL_HOME/tmp/classes > /dev/null
+   $JAVA_HOME/bin/jar xf $CUP_JAR
+   $JAVA_HOME/bin/jar xf $JLINE_JAR
+   $JAVA_HOME/bin/jar xf $GEN_JAR
+   $JAVA_HOME/bin/jar xf $CORE_JAR
+   $JAVA_HOME/bin/jar xf $MR_JAR
+   cd ..
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
+   popd > /dev/null
+fi
 
 if [ "$1" == "-local" ] || [ "$1" == "-dist" ]; then
-   LIBJARS="$CUP_JAR,$JLINE_JAR,$GEN_JAR,$CORE_JAR,$MR_JAR"
-   export HADOOP_CLASSPATH="$CUP_JAR:$JLINE_JAR:$GEN_JAR:$CORE_JAR:$MR_JAR"
-   $HADOOP_HOME/bin/hadoop jar $MRQL_HOME/lib/mrql-mr-*.jar org.apache.mrql.Main -libjars $LIBJARS $*
+   $HADOOP_HOME/bin/hadoop jar $FULL_JAR org.apache.mrql.Main $*
 else
-   CLASSPATH="$CUP_JAR:$JLINE_JAR:$GEN_JAR:$CORE_JAR:$MR_JAR:$HADOOP_JARS"
-   $JAVA_HOME/bin/java -classpath $CLASSPATH org.apache.mrql.Main $*
+   $JAVA_HOME/bin/java -classpath "$FULL_JAR:$HADOOP_JARS" org.apache.mrql.Main $*
 fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/bin/mrql.bsp
----------------------------------------------------------------------
diff --git a/bin/mrql.bsp b/bin/mrql.bsp
index c0fd179..651dbf5 100755
--- a/bin/mrql.bsp
+++ b/bin/mrql.bsp
@@ -31,29 +31,26 @@ MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"
 GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
 CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
 BSP_JAR=`ls "$MRQL_HOME"/lib/mrql-bsp-*.jar`
+FULL_JAR="$MRQL_HOME/lib/mrql-bsp-all.jar"
 
 export JAVA_HOME FS_DEFAULT_NAME BSP_MASTER_ADDRESS HAMA_ZOOKEEPER_QUORUM
 
-
-if [ "$1" == "-local" ]; then
-   LIBJARS="$CUP_JAR,$JLINE_JAR,$GEN_JAR,$CORE_JAR,$BSP_JAR"
-   export HAMA_CLASSPATH="$CUP_JAR:$JLINE_JAR:$CORE_JAR:$GEN_JAR:$BSP_JAR"
-   $HAMA_HOME/bin/hama jar $BSP_JAR org.apache.mrql.Main -libjars $LIBJARS -bsp $*
-else if [ "$1" == "-dist" ]; then
-   # Hama distributed mode -libjars has a bug; create a single jar instead
+if (! [ -a $FULL_JAR ]); then
+   rm -rf "$MRQL_HOME/tmp/classes"
    mkdir -p "$MRQL_HOME/tmp/classes"
-   pushd $MRQL_HOME/tmp/classes >/dev/null
+   pushd $MRQL_HOME/tmp/classes > /dev/null
    $JAVA_HOME/bin/jar xf $CUP_JAR
    $JAVA_HOME/bin/jar xf $JLINE_JAR
    $JAVA_HOME/bin/jar xf $GEN_JAR
    $JAVA_HOME/bin/jar xf $CORE_JAR
    $JAVA_HOME/bin/jar xf $BSP_JAR
    cd ..
-   $JAVA_HOME/bin/jar cf mrql-bsp.jar -C classes/ .
-   popd >/dev/null
-   $HAMA_HOME/bin/hama jar $MRQL_HOME/tmp/mrql-bsp.jar org.apache.mrql.Main -bsp $*
-else
-   HAMA_CLASSPATH="$CUP_JAR:$JLINE_JAR:$GEN_JAR:$CORE_JAR:$BSP_JAR:$HAMA_JAR:$HADOOP_JARS"
-   $JAVA_HOME/bin/java -classpath $HAMA_CLASSPATH org.apache.mrql.Main -bsp $*
+   $JAVA_HOME/bin/jar cf $FULL_JAR -C classes/ .
+   popd > /dev/null
 fi
+
+if [ "$1" == "-local" ] || [ "$1" == "-dist" ]; then
+   $HAMA_HOME/bin/hama jar $FULL_JAR org.apache.mrql.Main -bsp $*
+else
+   $JAVA_HOME/bin/java -classpath "$FULL_JAR:$HAMA_JAR:$HADOOP_JARS" org.apache.mrql.Main -bsp $*
 fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 982f1fd..9b4dd97 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -20,9 +20,20 @@
 #
 #--------------------------------------------------------------------------------
 #
-# Set Apache MRQL-specific environment variables here.
+# To rebuild Apache MRQL from sources:
+#
+# build MRQL on Hadoop 1.x:
+# mvn -Dhadoop.version=1.0.3 install
+#
+# build MRQL on Hadoop 2.x (yarn):
+# mvn -Pyarn -Dyarn.version=2.2.0 -Dhadoop.version=1.2.1 install
+#
+# build MRQL on Hadoop 0.20.x:
+# mvn -PMultipleInputs -Dhadoop.version=0.20.2 install
 #
 #--------------------------------------------------------------------------------
+#
+# Set Apache MRQL-specific environment variables here:
 
 
 # Required: The Java installation directory
@@ -33,12 +44,12 @@ JAVA_HOME=/root/jdk
 CUP_JAR=${HOME}/.m2/repository/net/sf/squirrel-sql/thirdparty/non-maven/java-cup/11a/java-cup-11a.jar
 
 # Required: The JLine library
-# You can download from http://jline.sourceforge.net
+# You can download it from http://jline.sourceforge.net
 JLINE_JAR=${HOME}/.m2/repository/jline/jline/1.0/jline-1.0.jar
 
 
-# Required: Hadoop configuration
-HADOOP_VERSION=1.0.3
+# Required: Hadoop configuration. Supports versions 0.20.x, 1.x, 2.x
+HADOOP_VERSION=1.2.1
 # The Hadoop installation directory
 HADOOP_HOME=${HOME}/hadoop-${HADOOP_VERSION}
 # The Hadoop job trackeer (as defined in hdfs-site.xml)
@@ -56,7 +67,7 @@ BSP_MASTER_ADDRESS=localhost:40000
 HAMA_ZOOKEEPER_QUORUM=localhost
 
 
-# Optional: Spark configuration
+# Optional: Spark configuration. Supports 0.8.1 only
 SPARK_HOME=${HOME}/spark-0.8.1-incubating-bin-hadoop1
 # URI of the Spark master node
 SPARK_MASTER=spark://crete:7077
@@ -66,11 +77,12 @@ SPARK_MEM="1g"
 
 # Claspaths
 
-HADOOP_JARS=${HADOOP_HOME}/hadoop-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/lib/commons-logging-1.1.1.jar:${HADOOP_HOME}/lib/log4j-1.2.15.jar:${HADOOP_HOME}/lib/commons-cli-1.2.jar
-
 HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
 
 SPARK_JARS=${SPARK_HOME}/assembly/target/scala-2.9.3/*
 
-# for hadoop yarn (build using eg,  mvn -Pyarn -Dhadoop.version=2.2.0 install)
-#HADOOP_JARS=${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-annotations-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar
+if [[ ${HADOOP_VERSION} =~ "^2.*$" ]]; then
+   HADOOP_JARS=${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/hadoop-annotations-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/log4j-1.2.17.jar:${HADOOP_HOME}/share/hadoop/common/lib/commons-cli-1.2.jar
+else
+   HADOOP_JARS=${HADOOP_HOME}/hadoop-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/lib/commons-logging-1.1.1.jar:${HADOOP_HOME}/lib/log4j-1.2.15.jar:${HADOOP_HOME}/lib/commons-cli-1.2.jar
+fi

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 20b8de1..00f25ac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -44,7 +44,8 @@
   </parent>
 
   <properties>
-    <hadoop.version>1.0.3</hadoop.version>
+    <hadoop.version>1.2.1</hadoop.version>
+    <yarn.version>2.2.0</yarn.version>
     <hama.version>0.6.3</hama.version>
     <spark.version>0.8.1-incubating</spark.version>
   </properties>
@@ -200,17 +201,12 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-common</artifactId>
-          <version>${hadoop.version}</version>
-        </dependency>
-        <dependency>
-          <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-hdfs</artifactId>
-          <version>${hadoop.version}</version>
+          <artifactId>hadoop-client</artifactId>
+          <version>${yarn.version}</version>
         </dependency>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <artifactId>hadoop-core</artifactId>
           <version>${hadoop.version}</version>
         </dependency>
       </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/src/main/java/MultipleInputs/DelegatingInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/MultipleInputs/DelegatingInputFormat.java b/src/main/java/MultipleInputs/DelegatingInputFormat.java
new file mode 100644
index 0000000..0d9b8f9
--- /dev/null
+++ b/src/main/java/MultipleInputs/DelegatingInputFormat.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates behavior of paths to multiple other
+ * InputFormats.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+public class DelegatingInputFormat<K, V> extends InputFormat<K, V> {
+
+  @SuppressWarnings("unchecked")
+  public List<InputSplit> getSplits(JobContext job) 
+      throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    Job jobCopy =new Job(conf);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    Map<Path, InputFormat> formatMap = 
+      MultipleInputs.getInputFormatMap(job);
+    Map<Path, Class<? extends Mapper>> mapperMap = MultipleInputs
+       .getMapperTypeMap(job);
+    Map<Class<? extends InputFormat>, List<Path>> formatPaths
+        = new HashMap<Class<? extends InputFormat>, List<Path>>();
+
+    // First, build a map of InputFormats to Paths
+    for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+      if (!formatPaths.containsKey(entry.getValue().getClass())) {
+       formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
+      }
+
+      formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
+    }
+
+    for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry : 
+        formatPaths.entrySet()) {
+      Class<? extends InputFormat> formatClass = formatEntry.getKey();
+      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+         formatClass, conf);
+      List<Path> paths = formatEntry.getValue();
+
+      Map<Class<? extends Mapper>, List<Path>> mapperPaths
+          = new HashMap<Class<? extends Mapper>, List<Path>>();
+
+      // Now, for each set of paths that have a common InputFormat, build
+      // a map of Mappers to the paths they're used for
+      for (Path path : paths) {
+       Class<? extends Mapper> mapperClass = mapperMap.get(path);
+       if (!mapperPaths.containsKey(mapperClass)) {
+         mapperPaths.put(mapperClass, new LinkedList<Path>());
+       }
+
+       mapperPaths.get(mapperClass).add(path);
+      }
+
+      // Now each set of paths that has a common InputFormat and Mapper can
+      // be added to the same job, and split together.
+      for (Entry<Class<? extends Mapper>, List<Path>> mapEntry :
+          mapperPaths.entrySet()) {
+       paths = mapEntry.getValue();
+       Class<? extends Mapper> mapperClass = mapEntry.getKey();
+
+       if (mapperClass == null) {
+         try {
+           mapperClass = job.getMapperClass();
+         } catch (ClassNotFoundException e) {
+           throw new IOException("Mapper class is not found", e);
+         }
+       }
+
+       FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths
+           .size()]));
+
+       // Get splits for each input path and tag with InputFormat
+       // and Mapper types by wrapping in a TaggedInputSplit.
+       List<InputSplit> pathSplits = format.getSplits(jobCopy);
+       for (InputSplit pathSplit : pathSplits) {
+         splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+             mapperClass));
+       }
+      }
+    }
+
+    return splits;
+  }
+
+  @Override
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new DelegatingRecordReader<K, V>(split, context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/src/main/java/MultipleInputs/DelegatingMapper.java
----------------------------------------------------------------------
diff --git a/src/main/java/MultipleInputs/DelegatingMapper.java b/src/main/java/MultipleInputs/DelegatingMapper.java
new file mode 100644
index 0000000..d72d2b5
--- /dev/null
+++ b/src/main/java/MultipleInputs/DelegatingMapper.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behavior of paths to multiple other
+ * mappers.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+public class DelegatingMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2> {
+
+  private Mapper<K1, V1, K2, V2> mapper;
+
+  @SuppressWarnings("unchecked")
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    // Find the Mapper from the TaggedInputSplit.
+    TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
+    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
+       .getMapperClass(), context.getConfiguration());
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  public void run(Context context) 
+      throws IOException, InterruptedException {
+    setup(context);
+    mapper.run(context);
+    cleanup(context);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/src/main/java/MultipleInputs/DelegatingRecordReader.java
----------------------------------------------------------------------
diff --git a/src/main/java/MultipleInputs/DelegatingRecordReader.java b/src/main/java/MultipleInputs/DelegatingRecordReader.java
new file mode 100644
index 0000000..f0d060e
--- /dev/null
+++ b/src/main/java/MultipleInputs/DelegatingRecordReader.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This is a delegating RecordReader, which delegates the functionality to the
+ * underlying record reader in {@link TaggedInputSplit}  
+ */
+public class DelegatingRecordReader<K, V> extends RecordReader<K, V> {
+  RecordReader<K, V> originalRR;
+
+  /**
+   * Constructs the DelegatingRecordReader.
+   * 
+   * @param split TaggegInputSplit object
+   * @param context TaskAttemptContext object
+   *  
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @SuppressWarnings("unchecked")
+  public DelegatingRecordReader(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+        .newInstance(taggedInputSplit.getInputFormatClass(), context
+            .getConfiguration());
+    originalRR = inputFormat.createRecordReader(taggedInputSplit
+        .getInputSplit(), context);
+  }
+
+  @Override
+  public void close() throws IOException {
+    originalRR.close();
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return originalRR.getCurrentKey();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return originalRR.getCurrentValue();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return originalRR.getProgress();
+  }
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    originalRR.initialize(((TaggedInputSplit) split).getInputSplit(), context);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    return originalRR.nextKeyValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/src/main/java/MultipleInputs/MultipleInputs.java
----------------------------------------------------------------------
diff --git a/src/main/java/MultipleInputs/MultipleInputs.java b/src/main/java/MultipleInputs/MultipleInputs.java
new file mode 100644
index 0000000..9140512
--- /dev/null
+++ b/src/main/java/MultipleInputs/MultipleInputs.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class supports MapReduce jobs that have multiple input paths with
+ * a different {@link InputFormat} and {@link Mapper} for each path 
+ */
+public class MultipleInputs {
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} to the list of
+   * inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass) {
+    String inputFormatMapping = path.toString() + ";"
+       + inputFormatClass.getName();
+    Configuration conf = job.getConfiguration();
+    String inputFormats = conf.get("mapred.input.dir.formats");
+    conf.set("mapred.input.dir.formats",
+       inputFormats == null ? inputFormatMapping : inputFormats + ","
+           + inputFormatMapping);
+
+    job.setInputFormatClass(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} and
+   * {@link Mapper} to the list of inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   * @param mapperClass {@link Mapper} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+
+    addInputPath(job, path, inputFormatClass);
+    Configuration conf = job.getConfiguration();
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    String mappers = conf.get("mapred.input.dir.mappers");
+    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    job.setMapperClass(DelegatingMapper.class);
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
+   * that should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class)
+   * @return A map of paths to inputformats for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
+    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+    Configuration conf = job.getConfiguration();
+    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      InputFormat inputFormat;
+      try {
+       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+           .getClassByName(split[1]), conf);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputFormat);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
+   * should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class, Class)
+   * @return A map of paths to mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends Mapper>> 
+      getMapperTypeMap(JobContext job) {
+    Configuration conf = job.getConfiguration();
+    if (conf.get("mapred.input.dir.mappers") == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends Mapper>> m = 
+      new HashMap<Path, Class<? extends Mapper>>();
+    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends Mapper> mapClass;
+      try {
+       mapClass = 
+         (Class<? extends Mapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/9b76440e/src/main/java/MultipleInputs/TaggedInputSplit.java
----------------------------------------------------------------------
diff --git a/src/main/java/MultipleInputs/TaggedInputSplit.java b/src/main/java/MultipleInputs/TaggedInputSplit.java
new file mode 100644
index 0000000..68bb789
--- /dev/null
+++ b/src/main/java/MultipleInputs/TaggedInputSplit.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+class TaggedInputSplit extends InputSplit implements Configurable, Writable {
+
+  private Class<? extends InputSplit> inputSplitClass;
+
+  private InputSplit inputSplit;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends InputFormat> inputFormatClass;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends Mapper> mapperClass;
+
+  private Configuration conf;
+
+  public TaggedInputSplit() {
+    // Default constructor.
+  }
+
+  /**
+   * Creates a new TaggedInputSplit.
+   * 
+   * @param inputSplit The InputSplit to be tagged
+   * @param conf The configuration to use
+   * @param inputFormatClass The InputFormat class to use for this job
+   * @param mapperClass The Mapper class to use for this job
+   */
+  @SuppressWarnings("unchecked")
+  public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+    this.inputSplitClass = inputSplit.getClass();
+    this.inputSplit = inputSplit;
+    this.conf = conf;
+    this.inputFormatClass = inputFormatClass;
+    this.mapperClass = mapperClass;
+  }
+
+  /**
+   * Retrieves the original InputSplit.
+   * 
+   * @return The InputSplit that was tagged
+   */
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  /**
+   * Retrieves the InputFormat class to use for this split.
+   * 
+   * @return The InputFormat class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  /**
+   * Retrieves the Mapper class to use for this split.
+   * 
+   * @return The Mapper class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
+    mapperClass = (Class<? extends Mapper<?, ?, ?, ?>>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+       .newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream)in);
+    inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputSplitClass.getName());
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, mapperClass.getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = 
+          factory.getSerializer(inputSplitClass);
+    serializer.open((DataOutputStream)out);
+    serializer.serialize(inputSplit);
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}