You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/05/01 21:59:03 UTC

svn commit: r1332858 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ src/test/org/apache/hcatalog/mapreduce/

Author: toffer
Date: Tue May  1 21:59:03 2012
New Revision: 1332858

URL: http://svn.apache.org/viewvc?rev=1332858&view=rev
Log:
HCAT-36 Support Writing Out to Multiple Tables in HCatOutputFormat (rohini via toffer)

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1332858&r1=1332857&r2=1332858&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue May  1 21:59:03 2012
@@ -72,6 +72,8 @@ Release 0.4.0 - Unreleased
   HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)
 
   NEW FEATURES
+  HCAT-36 Support Writing Out to Multiple Tables in HCatOutputFormat (rohini via toffer)
+
   HCAT-342 enable hcat to work with hive bigtop rpm (thejas via gates)
 
   HCAT-287 Add data api to HCatalog (hashutosh)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1332858&r1=1332857&r2=1332858&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Tue May  1 21:59:03 2012
@@ -48,6 +48,8 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
 import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -64,6 +66,7 @@ import java.util.Map.Entry;
  */
 class FileOutputCommitterContainer extends OutputCommitterContainer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class);
     private final boolean dynamicPartitioningUsed;
     private boolean partitionsDiscovered;
 
@@ -436,20 +439,18 @@ class FileOutputCommitterContainer exten
 
         partition.setParameters(params);
 
-        // Sets permissions and group name on partition dirs.
+        // Sets permissions and group name on partition dirs and files.
 
         Path partPath = new Path(partLocnRoot);
-        for(FieldSchema partKey : table.getPartitionKeys()){
-            partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
-//        LOG.info("Setting perms for "+partPath.toString());
-            fs.setPermission(partPath, perms);
-            try{
-                fs.setOwner(partPath, null, grpName);
-            } catch(AccessControlException ace){
-                // log the messages before ignoring. Currently, logging is not built in Hcatalog.
-//          LOG.warn(ace);
+        int i = 0;
+        for (FieldSchema partKey : table.getPartitionKeys()) {
+            if (i++ != 0) {
+                applyGroupAndPerms(fs, partPath, perms, grpName, false);
             }
+            partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
         }
+        // Apply the group and permissions to the leaf partition and files.
+        applyGroupAndPerms(fs, partPath, perms, grpName, true);
         if (dynamicPartitioningUsed){
             String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table,partKVs);
             if (harProcessor.isEnabled()){
@@ -466,7 +467,30 @@ class FileOutputCommitterContainer exten
         return partition;
     }
 
-
+    private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission,
+            String group, boolean recursive)
+            throws IOException {
+        fs.setPermission(dir, permission);
+        try {
+            fs.setOwner(dir, null, group);
+        } catch (AccessControlException ace) {
+            LOG.warn("Error changing group of " + dir, ace);
+        }
+        if (recursive) {
+            for (FileStatus fileStatus : fs.listStatus(dir)) {
+                if (fileStatus.isDir()) {
+                    applyGroupAndPerms(fs, fileStatus.getPath(), permission, group, recursive);
+                } else {
+                    fs.setPermission(fileStatus.getPath(), permission);
+                    try {
+                        fs.setOwner(dir, null, group);
+                    } catch (AccessControlException ace) {
+                        LOG.warn("Error changing group of " + dir, ace);
+                    }
+                }
+            }
+        }
+    }
 
     private String getFinalDynamicPartitionDestination(Table table, Map<String,String> partKVs) {
         // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA  ->

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/MultiOutputFormat.java Tue May  1 21:59:03 2012
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * The MultiOutputFormat class simplifies writing output data to multiple
+ * outputs.
+ * <p>
+ * Multiple output formats can be defined each with its own
+ * <code>OutputFormat</code> class, own key class and own value class. Any
+ * configuration on these output format classes can be done without interfering
+ * with other output format's configuration.
+ * <p>
+ * Usage pattern for job submission:
+ *
+ * <pre>
+ *
+ * Job job = new Job();
+ *
+ * FileInputFormat.setInputPath(job, inDir);
+ *
+ * job.setMapperClass(WordCountMap.class);
+ * job.setReducerClass(WordCountReduce.class);
+ * job.setInputFormatClass(TextInputFormat.class);
+ * job.setOutputFormatClass(MultiOutputFormat.class);
+ * // Need not define OutputKeyClass and OutputValueClass. They default to
+ * // Writable.class
+ * job.setMapOutputKeyClass(Text.class);
+ * job.setMapOutputValueClass(IntWritable.class);
+ *
+ *
+ * // Create a JobConfigurer that will configure the job with the multiple
+ * // output format information.
+ * JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+ *
+ * // Defines additional single text based output 'text' for the job.
+ * // Any configuration for the defined OutputFormat should be done with
+ * // the Job obtained with configurer.getJob() method.
+ * configurer.addOutputFormat("text", TextOutputFormat.class,
+ *                 IntWritable.class, Text.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("text"), textOutDir);
+ *
+ * // Defines additional sequence-file based output 'sequence' for the job
+ * configurer.addOutputFormat("sequence", SequenceFileOutputFormat.class,
+ *                 Text.class, IntWritable.class);
+ * FileOutputFormat.setOutputPath(configurer.getJob("sequence"), seqOutDir);
+ * ...
+ * // configure method to be called on the JobConfigurer once all the
+ * // output formats have been defined and configured.
+ * configurer.configure();
+ *
+ * job.waitForCompletion(true);
+ * ...
+ * </pre>
+ * <p>
+ * Usage in Reducer:
+ *
+ * <pre>
+ * public class WordCountReduce extends
+ *         Reducer&lt;Text, IntWritable, Writable, Writable&gt; {
+ *
+ *     private IntWritable count = new IntWritable();
+ *
+ *     public void reduce(Text word, Iterator&lt;IntWritable&gt; values,
+ *             Context context)
+ *             throws IOException {
+ *         int sum = 0;
+ *         for (IntWritable val : values) {
+ *             sum += val.get();
+ *         }
+ *         count.set(sum);
+ *         MultiOutputFormat.write(&quot;text&quot;, count, word, context);
+ *         MultiOutputFormat.write(&quot;sequence&quot;, word, count, context);
+ *     }
+ *
+ * }
+ *
+ * </pre>
+ *
+ * Map only jobs:
+ * <p>
+ * MultiOutputFormat.write("output", key, value, context); can be called similar
+ * to a reducer in map only jobs.
+ *
+ */
+public class MultiOutputFormat extends OutputFormat<Writable, Writable> {
+
+    private static final String MO_ALIASES = "mapreduce.multiout.aliases";
+    private static final String MO_ALIAS = "mapreduce.multiout.alias";
+    private static final String CONF_KEY_DELIM = "%%";
+    private static final String CONF_VALUE_DELIM = ";;";
+    private static final String COMMA_DELIM = ",";
+    private static final List<String> configsToOverride = new ArrayList<String>();
+    private static final List<String> configsToMerge = new ArrayList<String>();
+
+    static {
+        configsToOverride.add("mapred.output.dir");
+        configsToMerge.add(JobContext.JOB_NAMENODES);
+        configsToMerge.add("tmpfiles");
+        configsToMerge.add("tmpjars");
+        configsToMerge.add("tmparchives");
+    }
+
+    /**
+     * Get a JobConfigurer instance that will support configuration of the job
+     * for multiple output formats.
+     *
+     * @param job the mapreduce job to be submitted
+     * @return
+     */
+    public static JobConfigurer createConfigurer(Job job) {
+        return JobConfigurer.create(job);
+    }
+
+    /**
+     * Write the output key and value using the OutputFormat defined by the
+     * alias.
+     *
+     * @param alias the name given to the OutputFormat configuration
+     * @param key the output key to be written
+     * @param value the output value to be written
+     * @param context the Mapper or Reducer Context
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static <K, V> void write(String alias, K key, V value, TaskInputOutputContext context)
+            throws IOException, InterruptedException {
+        KeyValue<K, V> keyval = new KeyValue<K, V>(key, value);
+        context.write(new Text(alias), keyval);
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
+        for (String alias : getOutputFormatAliases(context)) {
+            JobContext aliasContext = getJobContext(alias, context);
+            OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+            outputFormat.checkOutputSpecs(aliasContext);
+            // Copy credentials and any new config added back to JobContext
+            context.getCredentials().addAll(aliasContext.getCredentials());
+            setAliasConf(alias, context, aliasContext);
+        }
+    }
+
+    @Override
+    public RecordWriter<Writable, Writable> getRecordWriter(TaskAttemptContext context)
+            throws IOException,
+            InterruptedException {
+        return new MultiRecordWriter(context);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException,
+            InterruptedException {
+        return new MultiOutputCommitter(context);
+    }
+
+    private static OutputFormat<?, ?> getOutputFormatInstance(JobContext context) {
+        OutputFormat<?, ?> outputFormat;
+        try {
+            outputFormat = ReflectionUtils.newInstance(context.getOutputFormatClass(),
+                    context.getConfiguration());
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(e);
+        }
+        return outputFormat;
+    }
+
+    private static String[] getOutputFormatAliases(JobContext context) {
+        return context.getConfiguration().getStrings(MO_ALIASES);
+    }
+
+    /**
+     * Compare the aliasContext with userJob and add the differing configuration
+     * as mapreduce.multiout.alias.<aliasname>.conf to the userJob.
+     * <p>
+     * Merge config like tmpjars, tmpfile, tmparchives,
+     * mapreduce.job.hdfs-servers that are directly handled by JobClient and add
+     * them to userJob.
+     * <p>
+     * Add mapred.output.dir config to userJob.
+     *
+     * @param alias alias name associated with a OutputFormat
+     * @param userJob reference to Job that the user is going to submit
+     * @param aliasContext JobContext populated with OutputFormat related
+     *            configuration.
+     */
+    private static void setAliasConf(String alias, JobContext userJob, JobContext aliasContext) {
+        Configuration userConf = userJob.getConfiguration();
+        StringBuilder builder = new StringBuilder();
+        for (Entry<String, String> conf : aliasContext.getConfiguration()) {
+            String key = conf.getKey();
+            String value = conf.getValue();
+            String jobValue = userConf.getRaw(key);
+            if (jobValue == null || !jobValue.equals(value)) {
+                if (configsToMerge.contains(key)) {
+                    String mergedValue = getMergedConfValue(jobValue, value);
+                    userConf.set(key, mergedValue);
+                } else {
+                    if (configsToOverride.contains(key)) {
+                        userConf.set(key, value);
+                    }
+                    builder.append(key).append(CONF_KEY_DELIM).append(value)
+                            .append(CONF_VALUE_DELIM);
+                }
+            }
+        }
+        builder.delete(builder.length() - CONF_VALUE_DELIM.length(), builder.length());
+        userConf.set(getAliasConfName(alias), builder.toString());
+    }
+
+    private static String getMergedConfValue(String originalValues, String newValues) {
+        if (originalValues == null) {
+            return newValues;
+        }
+        Set<String> mergedValues = new HashSet<String>();
+        mergedValues.addAll(StringUtils.getStringCollection(originalValues));
+        mergedValues.addAll(StringUtils.getStringCollection(newValues));
+        StringBuilder builder = new StringBuilder(originalValues.length() + newValues.length() + 2);
+        for (String value : mergedValues) {
+            builder.append(value).append(COMMA_DELIM);
+        }
+        return builder.substring(0, builder.length() - COMMA_DELIM.length());
+    }
+
+    private static JobContext getJobContext(String alias, JobContext context) {
+        String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+        JobContext aliasContext = new JobContext(context.getConfiguration(), context.getJobID());
+        addToConfig(aliasConf, aliasContext.getConfiguration());
+        return aliasContext;
+    }
+
+    private static TaskAttemptContext getTaskContext(String alias, TaskAttemptContext context) {
+        String aliasConf = context.getConfiguration().get(getAliasConfName(alias));
+        TaskAttemptContext aliasContext = new TaskAttemptContext(context.getConfiguration(),
+                context.getTaskAttemptID());
+        addToConfig(aliasConf, aliasContext.getConfiguration());
+        return aliasContext;
+    }
+
+    private static String getAliasConfName(String alias) {
+        return MO_ALIAS + "." + alias + ".conf";
+    }
+
+    private static void addToConfig(String aliasConf, Configuration conf) {
+        String[] config = aliasConf.split(CONF_KEY_DELIM + "|" + CONF_VALUE_DELIM);
+        for (int i = 0; i < config.length; i += 2) {
+            conf.set(config[i], config[i + 1]);
+        }
+    }
+
+    /**
+     * Class that supports configuration of the job for multiple output formats.
+     */
+    public static class JobConfigurer {
+
+        private final Job job;
+        private Map<String, Job> outputConfigs = new LinkedHashMap<String, Job>();
+
+        private JobConfigurer(Job job) {
+            this.job = job;
+        }
+
+        private static JobConfigurer create(Job job) {
+            JobConfigurer configurer = new JobConfigurer(job);
+            return configurer;
+        }
+
+        /**
+         * Add a OutputFormat configuration to the Job with a alias name.
+         *
+         * @param alias the name to be given to the OutputFormat configuration
+         * @param outputFormatClass OutputFormat class
+         * @param keyClass the key class for the output data
+         * @param valueClass the value class for the output data
+         * @throws IOException
+         */
+        public void addOutputFormat(String alias,
+                Class<? extends OutputFormat> outputFormatClass,
+                Class<?> keyClass, Class<?> valueClass) throws IOException {
+            Job copy = new Job(this.job.getConfiguration());
+            outputConfigs.put(alias, copy);
+            copy.setOutputFormatClass(outputFormatClass);
+            copy.setOutputKeyClass(keyClass);
+            copy.setOutputValueClass(valueClass);
+        }
+
+        /**
+         * Get the Job configuration for a OutputFormat defined by the alias
+         * name. The job returned by this method should be passed to the
+         * OutputFormat for any configuration instead of the Job that will be
+         * submitted to the JobClient.
+         *
+         * @param alias the name used for the OutputFormat during
+         *            addOutputFormat
+         * @return
+         */
+        public Job getJob(String alias) {
+            Job copy = outputConfigs.get(alias);
+            if (copy == null) {
+                throw new IllegalArgumentException("OutputFormat with alias " + alias
+                        + " has not beed added");
+            }
+            return copy;
+        }
+
+        /**
+         * Configure the job with the multiple output formats added. This method
+         * should be called after all the output formats have been added and
+         * configured and before the job submission.
+         */
+        public void configure() {
+            StringBuilder aliases = new StringBuilder();
+            Configuration jobConf = job.getConfiguration();
+            for (Entry<String, Job> entry : outputConfigs.entrySet()) {
+                // Copy credentials
+                job.getCredentials().addAll(entry.getValue().getCredentials());
+                String alias = entry.getKey();
+                aliases.append(alias).append(COMMA_DELIM);
+                // Store the differing configuration for each alias in the job
+                // as a setting.
+                setAliasConf(alias, job, entry.getValue());
+            }
+            aliases.delete(aliases.length() - COMMA_DELIM.length(), aliases.length());
+            jobConf.set(MO_ALIASES, aliases.toString());
+        }
+
+    }
+
+    private static class KeyValue<K, V> implements Writable {
+        private final K key;
+        private final V value;
+
+        public KeyValue(K key, V value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public K getKey() {
+            return key;
+        }
+
+        public V getValue() {
+            return value;
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            // Ignore. Not required as this will be never
+            // serialized/deserialized.
+        }
+
+        @Override
+        public void readFields(DataInput in) throws IOException {
+            // Ignore. Not required as this will be never
+            // serialized/deserialized.
+        }
+    }
+
+    private static class MultiRecordWriter extends RecordWriter<Writable, Writable> {
+
+        private final Map<String, BaseRecordWriterContainer> baseRecordWriters;
+
+        public MultiRecordWriter(TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            baseRecordWriters = new LinkedHashMap<String, BaseRecordWriterContainer>();
+            String[] aliases = getOutputFormatAliases(context);
+            for (String alias : aliases) {
+                TaskAttemptContext aliasContext = getTaskContext(alias, context);
+                Configuration aliasConf = aliasContext.getConfiguration();
+                // Create output directory if not already created.
+                String outDir = aliasConf.get("mapred.output.dir");
+                if (outDir != null) {
+                    Path outputDir = new Path(outDir);
+                    FileSystem fs = outputDir.getFileSystem(aliasConf);
+                    if (!fs.exists(outputDir)) {
+                        fs.mkdirs(outputDir);
+                    }
+                }
+                OutputFormat<?, ?> outputFormat = getOutputFormatInstance(aliasContext);
+                baseRecordWriters.put(alias,
+                        new BaseRecordWriterContainer(outputFormat.getRecordWriter(aliasContext),
+                                aliasContext));
+            }
+        }
+
+        @Override
+        public void write(Writable key, Writable value) throws IOException, InterruptedException {
+            Text _key = (Text) key;
+            KeyValue _value = (KeyValue) value;
+            String alias = new String(_key.getBytes(), 0, _key.getLength());
+            BaseRecordWriterContainer baseRWContainer = baseRecordWriters.get(alias);
+            if (baseRWContainer == null) {
+                throw new IllegalArgumentException("OutputFormat with alias " + alias
+                        + " has not been added");
+            }
+            baseRWContainer.getRecordWriter().write(_value.getKey(), _value.getValue());
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
+            for (BaseRecordWriterContainer baseRWContainer : baseRecordWriters.values()) {
+                baseRWContainer.getRecordWriter().close(baseRWContainer.getContext());
+            }
+        }
+
+    }
+
+    private static class BaseRecordWriterContainer {
+
+        private final RecordWriter recordWriter;
+        private final TaskAttemptContext context;
+
+        public BaseRecordWriterContainer(RecordWriter recordWriter, TaskAttemptContext context) {
+            this.recordWriter = recordWriter;
+            this.context = context;
+        }
+
+        public RecordWriter getRecordWriter() {
+            return recordWriter;
+        }
+
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+    private class MultiOutputCommitter extends OutputCommitter {
+
+        private final Map<String, BaseOutputCommitterContainer> outputCommitters;
+
+        public MultiOutputCommitter(TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            outputCommitters = new LinkedHashMap<String, MultiOutputFormat.BaseOutputCommitterContainer>();
+            String[] aliases = getOutputFormatAliases(context);
+            for (String alias : aliases) {
+                TaskAttemptContext aliasContext = getTaskContext(alias, context);
+                OutputCommitter baseCommitter = getOutputFormatInstance(aliasContext)
+                        .getOutputCommitter(aliasContext);
+                outputCommitters.put(alias,
+                        new BaseOutputCommitterContainer(baseCommitter, aliasContext));
+            }
+        }
+
+        @Override
+        public void setupJob(JobContext jobContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().setupJob(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().setupTask(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+            boolean needTaskCommit = false;
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                needTaskCommit = needTaskCommit
+                        || outputContainer.getBaseCommitter().needsTaskCommit(
+                                outputContainer.getContext());
+            }
+            return needTaskCommit;
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().commitTask(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void abortTask(TaskAttemptContext taskContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().abortTask(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().commitJob(outputContainer.getContext());
+            }
+        }
+
+        @Override
+        public void abortJob(JobContext jobContext, State state) throws IOException {
+            for (String alias : outputCommitters.keySet()) {
+                BaseOutputCommitterContainer outputContainer = outputCommitters.get(alias);
+                outputContainer.getBaseCommitter().abortJob(outputContainer.getContext(), state);
+            }
+        }
+    }
+
+    private static class BaseOutputCommitterContainer {
+
+        private final OutputCommitter outputCommitter;
+        private final TaskAttemptContext context;
+
+        public BaseOutputCommitterContainer(OutputCommitter outputCommitter,
+                TaskAttemptContext context) {
+            this.outputCommitter = outputCommitter;
+            this.context = context;
+        }
+
+        public OutputCommitter getBaseCommitter() {
+            return outputCommitter;
+        }
+
+        public TaskAttemptContext getContext() {
+            return context;
+        }
+    }
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatMultiOutputFormat.java Tue May  1 21:59:03 2012
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHCatMultiOutputFormat {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestHCatMultiOutputFormat.class);
+
+    private static final String DATABASE = "default";
+    private static final String[] tableNames = {"test1", "test2", "test3"};
+    private static final String[] tablePerms = {"755", "750", "700"};
+    private static Path warehousedir = null;
+    private static HashMap<String, HCatSchema> schemaMap = new HashMap<String, HCatSchema>();
+    private static HiveMetaStoreClient hmsc;
+    private static MiniMRCluster mrCluster;
+    private static Configuration mrConf;
+    private static HiveConf hiveConf;
+    private static File workDir;
+
+    private static final String msPort = "20199";
+    private static Thread t;
+
+    static {
+        schemaMap.put(tableNames[0], new HCatSchema(ColumnHolder.hCattest1Cols));
+        schemaMap.put(tableNames[1], new HCatSchema(ColumnHolder.hCattest2Cols));
+        schemaMap.put(tableNames[2], new HCatSchema(ColumnHolder.hCattest3Cols));
+    }
+
+    private static class RunMS implements Runnable {
+
+        @Override
+        public void run() {
+            try {
+                String warehouseConf = HiveConf.ConfVars.METASTOREWAREHOUSE.varname + "="
+                        + warehousedir.toString();
+                HiveMetaStore.main(new String[] {"-v", "-p", msPort, "--hiveconf", warehouseConf});
+            } catch (Throwable t) {
+                System.err.println("Exiting. Got exception from metastore: " + t.getMessage());
+            }
+        }
+
+    }
+
+    /**
+     * Private class which holds all the data for the test cases
+     */
+    private static class ColumnHolder {
+
+        private static ArrayList<HCatFieldSchema> hCattest1Cols = new ArrayList<HCatFieldSchema>();
+        private static ArrayList<HCatFieldSchema> hCattest2Cols = new ArrayList<HCatFieldSchema>();
+        private static ArrayList<HCatFieldSchema> hCattest3Cols = new ArrayList<HCatFieldSchema>();
+
+        private static ArrayList<FieldSchema> partitionCols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test1Cols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test2Cols = new ArrayList<FieldSchema>();
+        private static ArrayList<FieldSchema> test3Cols = new ArrayList<FieldSchema>();
+
+        private static HashMap<String, List<FieldSchema>> colMapping = new HashMap<String, List<FieldSchema>>();
+
+        static {
+            try {
+                FieldSchema keyCol = new FieldSchema("key", Constants.STRING_TYPE_NAME, "");
+                test1Cols.add(keyCol);
+                test2Cols.add(keyCol);
+                test3Cols.add(keyCol);
+                hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                hCattest2Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(keyCol));
+                FieldSchema valueCol = new FieldSchema("value", Constants.STRING_TYPE_NAME, "");
+                test1Cols.add(valueCol);
+                test3Cols.add(valueCol);
+                hCattest1Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(valueCol));
+                FieldSchema extraCol = new FieldSchema("extra", Constants.STRING_TYPE_NAME, "");
+                test3Cols.add(extraCol);
+                hCattest3Cols.add(HCatSchemaUtils.getHCatFieldSchema(extraCol));
+                colMapping.put("test1", test1Cols);
+                colMapping.put("test2", test2Cols);
+                colMapping.put("test3", test3Cols);
+            } catch (HCatException e) {
+                LOG.error("Error in setting up schema fields for the table", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        static {
+            partitionCols.add(new FieldSchema("ds", Constants.STRING_TYPE_NAME, ""));
+            partitionCols.add(new FieldSchema("cluster", Constants.STRING_TYPE_NAME, ""));
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        String testDir = System.getProperty("test.data.dir", "./");
+        testDir = testDir + "/test_multitable_" + Math.abs(new Random().nextLong()) + "/";
+        workDir = new File(new File(testDir).getCanonicalPath());
+        FileUtil.fullyDelete(workDir);
+        workDir.mkdirs();
+
+        warehousedir = new Path(workDir + "/warehouse");
+
+        // Run hive metastore server
+        t = new Thread(new RunMS());
+        t.start();
+
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        Configuration conf = new Configuration(true);
+        FileSystem fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+                new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+        fs.mkdirs(warehousedir);
+
+        initializeSetup();
+    }
+
+    private static void initializeSetup() throws Exception {
+
+        hiveConf = new HiveConf(mrConf, TestHCatMultiOutputFormat.class);
+        hiveConf.set("hive.metastore.local", "false");
+        hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort);
+        hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTRETRIES, 3);
+
+        hiveConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        System.setProperty(HiveConf.ConfVars.PREEXECHOOKS.varname, " ");
+        System.setProperty(HiveConf.ConfVars.POSTEXECHOOKS.varname, " ");
+
+        hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousedir.toString());
+        try {
+            hmsc = new HiveMetaStoreClient(hiveConf, null);
+            initalizeTables();
+        } catch (Throwable e) {
+            LOG.error("Exception encountered while setting up testcase", e);
+            throw new Exception(e);
+        } finally {
+            hmsc.close();
+        }
+    }
+
+    private static void initalizeTables() throws Exception {
+        for (String table : tableNames) {
+            try {
+                if (hmsc.getTable(DATABASE, table) != null) {
+                    hmsc.dropTable(DATABASE, table);
+                }
+            } catch (NoSuchObjectException ignored) {
+            }
+        }
+        for (int i = 0; i < tableNames.length; i++) {
+            createTable(tableNames[i], tablePerms[i]);
+        }
+    }
+
+    private static void createTable(String tableName, String tablePerm) throws Exception {
+        Table tbl = new Table();
+        tbl.setDbName(DATABASE);
+        tbl.setTableName(tableName);
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(ColumnHolder.colMapping.get(tableName));
+        tbl.setSd(sd);
+        sd.setParameters(new HashMap<String, String>());
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
+        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
+        sd.getSerdeInfo().getParameters().put(
+                org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(
+                org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+        tbl.setPartitionKeys(ColumnHolder.partitionCols);
+
+        hmsc.createTable(tbl);
+        FileSystem fs = FileSystem.get(mrConf);
+        fs.setPermission(new Path(warehousedir, tableName), new FsPermission(tablePerm));
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        FileUtil.fullyDelete(workDir);
+        FileSystem fs = FileSystem.get(mrConf);
+        if (fs.exists(warehousedir)) {
+            fs.delete(warehousedir, true);
+        }
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+    }
+
+    /**
+     * Simple test case.
+     * <ol>
+     * <li>Submits a mapred job which writes out one fixed line to each of the tables</li>
+     * <li>uses hive fetch task to read the data and see if it matches what was written</li>
+     * </ol>
+     *
+     * @throws Exception if any error occurs
+     */
+    @Test
+    public void testOutputFormat() throws Throwable {
+        HashMap<String, String> partitionValues = new HashMap<String, String>();
+        partitionValues.put("ds", "1");
+        partitionValues.put("cluster", "ag");
+        ArrayList<OutputJobInfo> infoList = new ArrayList<OutputJobInfo>();
+        infoList.add(OutputJobInfo.create("default", tableNames[0], partitionValues));
+        infoList.add(OutputJobInfo.create("default", tableNames[1], partitionValues));
+        infoList.add(OutputJobInfo.create("default", tableNames[2], partitionValues));
+
+        Job job = new Job(hiveConf, "SampleJob");
+
+        job.setMapperClass(MyMapper.class);
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(MultiOutputFormat.class);
+        job.setNumReduceTasks(0);
+
+        JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+        for (int i = 0; i < tableNames.length; i++) {
+            configurer.addOutputFormat(tableNames[i], HCatOutputFormat.class, BytesWritable.class,
+                    HCatRecord.class);
+            HCatOutputFormat.setOutput(configurer.getJob(tableNames[i]), infoList.get(i));
+            HCatOutputFormat.setSchema(configurer.getJob(tableNames[i]),
+                    schemaMap.get(tableNames[i]));
+        }
+        configurer.configure();
+
+        Path filePath = createInputFile();
+        FileInputFormat.addInputPath(job, filePath);
+        Assert.assertTrue(job.waitForCompletion(true));
+
+        ArrayList<String> outputs = new ArrayList<String>();
+        for (String tbl : tableNames) {
+            outputs.add(getTableData(tbl, "default").get(0));
+        }
+        Assert.assertEquals("Comparing output of table " +
+                tableNames[0] + " is not correct", outputs.get(0), "a,a,1,ag");
+        Assert.assertEquals("Comparing output of table " +
+                tableNames[1] + " is not correct", outputs.get(1), "a,1,ag");
+        Assert.assertEquals("Comparing output of table " +
+                tableNames[2] + " is not correct", outputs.get(2), "a,a,extra,1,ag");
+
+        // Check permisssion on partition dirs and files created
+        for (int i = 0; i < tableNames.length; i++) {
+            Path partitionFile = new Path(warehousedir + "/" + tableNames[i]
+                    + "/ds=1/cluster=ag/part-m-00000");
+            FileSystem fs = partitionFile.getFileSystem(mrConf);
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                    fs.getFileStatus(partitionFile).getPermission(),
+                    new FsPermission(tablePerms[i]));
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                    fs.getFileStatus(partitionFile.getParent()).getPermission(),
+                    new FsPermission(tablePerms[i]));
+            Assert.assertEquals("File permissions of table " + tableNames[i] + " is not correct",
+                    fs.getFileStatus(partitionFile.getParent().getParent()).getPermission(),
+                    new FsPermission(tablePerms[i]));
+
+        }
+        LOG.info("File permissions verified");
+    }
+
+    /**
+     * Create a input file for map
+     *
+     * @return absolute path of the file.
+     * @throws IOException if any error encountered
+     */
+    private Path createInputFile() throws IOException {
+        Path f = new Path(workDir + "/MultiTableInput.txt");
+        FileSystem fs = FileSystem.get(mrConf);
+        if (fs.exists(f)) {
+            fs.delete(f, true);
+        }
+        OutputStream out = fs.create(f);
+        for (int i = 0; i < 3; i++) {
+            out.write("a,a\n".getBytes());
+        }
+        out.close();
+        return f;
+    }
+
+    /**
+     * Method to fetch table data
+     *
+     * @param table table name
+     * @param database database
+     * @return list of columns in comma seperated way
+     * @throws Exception if any error occurs
+     */
+    private List<String> getTableData(String table, String database) throws Exception {
+        HiveConf conf = new HiveConf();
+        conf.addResource("hive-site.xml");
+        ArrayList<String> results = new ArrayList<String>();
+        ArrayList<String> temp = new ArrayList<String>();
+        Hive hive = Hive.get(conf);
+        org.apache.hadoop.hive.ql.metadata.Table tbl = hive.getTable(database, table);
+        FetchWork work;
+        if (!tbl.getPartCols().isEmpty()) {
+            List<Partition> partitions = hive.getPartitions(tbl);
+            List<PartitionDesc> partDesc = new ArrayList<PartitionDesc>();
+            List<String> partLocs = new ArrayList<String>();
+            for (Partition part : partitions) {
+                partLocs.add(part.getLocation());
+                partDesc.add(Utilities.getPartitionDesc(part));
+            }
+            work = new FetchWork(partLocs, partDesc);
+            work.setLimit(100);
+        } else {
+            work = new FetchWork(tbl.getDataLocation().toString(), Utilities.getTableDesc(tbl));
+        }
+        FetchTask task = new FetchTask();
+        task.setWork(work);
+        task.initialize(conf, null, null);
+        task.fetch(temp);
+        for (String str : temp) {
+            results.add(str.replace("\t", ","));
+        }
+        return results;
+    }
+
+    private static class MyMapper extends
+            Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
+
+        private int i = 0;
+
+        @Override
+        protected void map(LongWritable key, Text value, Context context)
+                throws IOException, InterruptedException {
+            HCatRecord record = null;
+            String[] splits = value.toString().split(",");
+            switch (i) {
+            case 0:
+                record = new DefaultHCatRecord(2);
+                record.set(0, splits[0]);
+                record.set(1, splits[1]);
+                break;
+            case 1:
+                record = new DefaultHCatRecord(1);
+                record.set(0, splits[0]);
+                break;
+            case 2:
+                record = new DefaultHCatRecord(3);
+                record.set(0, splits[0]);
+                record.set(1, splits[1]);
+                record.set(2, "extra");
+                break;
+            default:
+                Assert.fail("This should not happen!!!!!");
+            }
+            MultiOutputFormat.write(tableNames[i], null, record, context);
+            i++;
+        }
+    }
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java?rev=1332858&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestMultiOutputFormat.java Tue May  1 21:59:03 2012
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Random;
+import java.util.StringTokenizer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.mapreduce.MultiOutputFormat.JobConfigurer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMultiOutputFormat {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestMultiOutputFormat.class);
+    private static File workDir;
+    private static Configuration mrConf = null;
+    private static FileSystem fs = null;
+    private static MiniMRCluster mrCluster = null;
+
+    @BeforeClass
+    public static void setup() throws IOException {
+        createWorkDir();
+        Configuration conf = new Configuration(true);
+        fs = FileSystem.get(conf);
+        System.setProperty("hadoop.log.dir", new File(workDir, "/logs").getAbsolutePath());
+        // LocalJobRunner does not work with mapreduce OutputCommitter. So need
+        // to use MiniMRCluster. MAPREDUCE-2350
+        mrCluster = new MiniMRCluster(1, fs.getUri().toString(), 1, null, null,
+                new JobConf(conf));
+        mrConf = mrCluster.createJobConf();
+    }
+
+    private static void createWorkDir() throws IOException {
+        String testDir = System.getProperty("test.data.dir", "./");
+        testDir = testDir + "/test_multiout_" + Math.abs(new Random().nextLong()) + "/";
+        workDir = new File(new File(testDir).getCanonicalPath());
+        FileUtil.fullyDelete(workDir);
+        workDir.mkdirs();
+    }
+
+    @AfterClass
+    public static void tearDown() throws IOException {
+        if (mrCluster != null) {
+            mrCluster.shutdown();
+        }
+        FileUtil.fullyDelete(workDir);
+    }
+
+    /**
+     * A test job that reads a input file and outputs each word and the index of
+     * the word encountered to a text file and sequence file with different key
+     * values.
+     */
+    @Test
+    public void testMultiOutputFormatWithoutReduce() throws Throwable {
+        Job job = new Job(mrConf, "MultiOutNoReduce");
+        job.setMapperClass(MultiOutWordIndexMapper.class);
+        job.setJarByClass(this.getClass());
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(MultiOutputFormat.class);
+        job.setNumReduceTasks(0);
+
+        JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+        configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+        configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+                IntWritable.class);
+        Path outDir = new Path(workDir.getPath(), job.getJobName());
+        FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+        FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+        String fileContent = "Hello World";
+        String inputFile = createInputFile(fileContent);
+        FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+        configurer.configure();
+        Assert.assertTrue(job.waitForCompletion(true));
+
+        Path textOutPath = new Path(outDir, "out1/part-m-00000");
+        String[] textOutput = readFully(textOutPath).split("\n");
+        Path seqOutPath = new Path(outDir, "out2/part-m-00000");
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+        Text key = new Text();
+        IntWritable value = new IntWritable();
+        String[] words = fileContent.split(" ");
+        Assert.assertEquals(words.length, textOutput.length);
+        LOG.info("Verifying file contents");
+        for (int i = 0; i < words.length; i++) {
+            Assert.assertEquals((i + 1) + "\t" + words[i], textOutput[i]);
+            reader.next(key, value);
+            Assert.assertEquals(words[i], key.toString());
+            Assert.assertEquals((i + 1), value.get());
+        }
+        Assert.assertFalse(reader.next(key, value));
+    }
+
+    /**
+     * A word count test job that reads a input file and outputs the count of
+     * words to a text file and sequence file with different key values.
+     */
+    @Test
+    public void testMultiOutputFormatWithReduce() throws Throwable {
+        Job job = new Job(mrConf, "MultiOutWithReduce");
+
+        job.setMapperClass(WordCountMapper.class);
+        job.setReducerClass(MultiOutWordCountReducer.class);
+        job.setJarByClass(this.getClass());
+        job.setInputFormatClass(TextInputFormat.class);
+        job.setOutputFormatClass(MultiOutputFormat.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(IntWritable.class);
+
+        JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);
+
+        configurer.addOutputFormat("out1", TextOutputFormat.class, IntWritable.class, Text.class);
+        configurer.addOutputFormat("out2", SequenceFileOutputFormat.class, Text.class,
+                IntWritable.class);
+        Path outDir = new Path(workDir.getPath(), job.getJobName());
+        FileOutputFormat.setOutputPath(configurer.getJob("out1"), new Path(outDir, "out1"));
+        FileOutputFormat.setOutputPath(configurer.getJob("out2"), new Path(outDir, "out2"));
+
+        configurer.configure();
+
+        String fileContent = "Hello World Hello World World";
+        String inputFile = createInputFile(fileContent);
+        FileInputFormat.setInputPaths(job, new Path(inputFile));
+
+        Assert.assertTrue(job.waitForCompletion(true));
+
+        Path textOutPath = new Path(outDir, "out1/part-r-00000");
+        String[] textOutput = readFully(textOutPath).split("\n");
+        Path seqOutPath = new Path(outDir, "out2/part-r-00000");
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, seqOutPath, mrConf);
+        Text key = new Text();
+        IntWritable value = new IntWritable();
+        String[] words = "Hello World".split(" ");
+        Assert.assertEquals(words.length, textOutput.length);
+        for (int i = 0; i < words.length; i++) {
+            Assert.assertEquals((i + 2) + "\t" + words[i], textOutput[i]);
+            reader.next(key, value);
+            Assert.assertEquals(words[i], key.toString());
+            Assert.assertEquals((i + 2), value.get());
+        }
+        Assert.assertFalse(reader.next(key, value));
+
+    }
+
+
+    /**
+     * Create a file for map input
+     *
+     * @return absolute path of the file.
+     * @throws IOException if any error encountered
+     */
+    private String createInputFile(String content) throws IOException {
+        File f = File.createTempFile("input", "txt");
+        FileWriter writer = new FileWriter(f);
+        writer.write(content);
+        writer.close();
+        return f.getAbsolutePath();
+    }
+
+    private String readFully(Path file) throws IOException {
+        FSDataInputStream in = fs.open(file);
+        byte[] b = new byte[in.available()];
+        in.readFully(b);
+        in.close();
+        return new String(b);
+    }
+
+    private static class MultiOutWordIndexMapper extends
+            Mapper<LongWritable, Text, Writable, Writable> {
+
+        private IntWritable index = new IntWritable(1);
+        private Text word = new Text();
+
+        @Override
+        protected void map(LongWritable key, Text value, Context context)
+                throws IOException, InterruptedException {
+            StringTokenizer itr = new StringTokenizer(value.toString());
+            while (itr.hasMoreTokens()) {
+                word.set(itr.nextToken());
+                MultiOutputFormat.write("out1", index, word, context);
+                MultiOutputFormat.write("out2", word, index, context);
+                index.set(index.get() + 1);
+            }
+        }
+    }
+
+    private static class WordCountMapper extends
+            Mapper<LongWritable, Text, Text, IntWritable> {
+
+        private final static IntWritable one = new IntWritable(1);
+        private Text word = new Text();
+
+        @Override
+        protected void map(LongWritable key, Text value, Context context)
+                throws IOException, InterruptedException {
+            StringTokenizer itr = new StringTokenizer(value.toString());
+            while (itr.hasMoreTokens()) {
+                word.set(itr.nextToken());
+                context.write(word, one);
+            }
+        }
+    }
+
+    private static class MultiOutWordCountReducer extends
+            Reducer<Text, IntWritable, Writable, Writable> {
+
+        private IntWritable result = new IntWritable();
+
+        @Override
+        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
+                throws IOException, InterruptedException {
+            int sum = 0;
+            for (IntWritable val : values) {
+                sum += val.get();
+            }
+            result.set(sum);
+            MultiOutputFormat.write("out1", result, key, context);
+            MultiOutputFormat.write("out2", key, result, context);
+        }
+    }
+
+}